From b2af09060b3b755637cce00ff57db7b71546bb0d Mon Sep 17 00:00:00 2001 From: Chris Hogan Date: Thu, 4 Feb 2021 09:50:03 -0600 Subject: [PATCH] Bug fixes for Ares --- src/api/bucket.cc | 6 ++- src/api/hermes.cc | 4 +- src/buffer_pool.cc | 14 ++++--- src/metadata_management.cc | 28 ++++++------- src/metadata_management.h | 6 +++ src/rpc_thallium.cc | 80 +++++++++++++++++--------------------- 6 files changed, 69 insertions(+), 69 deletions(-) diff --git a/src/api/bucket.cc b/src/api/bucket.cc index 09326b0f4..c8ab7b255 100644 --- a/src/api/bucket.cc +++ b/src/api/bucket.cc @@ -87,8 +87,10 @@ size_t Bucket::GetBlobSize(Arena *arena, const std::string &name, << name_ << '\n'; BlobID blob_id = GetBlobIdByName(&hermes_->context_, &hermes_->rpc_, name.c_str()); - result = GetBlobSizeById(&hermes_->context_, &hermes_->rpc_, arena, - blob_id); + if (!IsNullBlobId(blob_id)) { + result = GetBlobSizeById(&hermes_->context_, &hermes_->rpc_, arena, + blob_id); + } } return result; diff --git a/src/api/hermes.cc b/src/api/hermes.cc index 8ba75dece..e9a948610 100644 --- a/src/api/hermes.cc +++ b/src/api/hermes.cc @@ -294,9 +294,7 @@ std::shared_ptr InitHermes(Config *config, bool is_daemon, api::Context::default_buffer_organizer_retries = config->num_buffer_organizer_retries; - if (comm.proc_kind == ProcessKind::kApp) { - InitRpcClients(&result->rpc_); - } + InitRpcClients(&result->rpc_); return result; } diff --git a/src/buffer_pool.cc b/src/buffer_pool.cc index 7f65e9acf..1b7f2c8ab 100644 --- a/src/buffer_pool.cc +++ b/src/buffer_pool.cc @@ -82,8 +82,9 @@ void Finalize(SharedMemoryContext *context, CommunicationContext *comm, RpcContext *rpc, const char *shmem_name, Arena *trans_arena, bool is_application_core, bool force_rpc_shutdown) { WorldBarrier(comm); + ShutdownRpcClients(rpc); + if (is_application_core) { - ShutdownRpcClients(rpc); ReleaseSharedMemoryContext(context); HERMES_DEBUG_CLIENT_CLOSE(); } @@ -491,7 +492,7 @@ void ReleaseBuffer(SharedMemoryContext *context, RpcContext *rpc, if (target_node == rpc->node_id) { LocalReleaseBuffer(context, buffer_id); } else { - RpcCall(rpc, target_node, "RemoteReleaseBuffer", buffer_id); + RpcCall(rpc, target_node, "RemoteReleaseBuffer", buffer_id); } } @@ -1474,12 +1475,13 @@ size_t ReadBlobFromBuffers(SharedMemoryContext *context, RpcContext *rpc, assert(bytes_transferred == buffer_sizes[i]); bytes_read += bytes_transferred; } else { - std::vector data = RpcCall>(rpc, id.bits.node_id, - "RemoteReadBufferById", - id, buffer_sizes[i]); + std::vector data = + RpcCall>(rpc, id.bits.node_id, "RemoteReadBufferById", + id); bytes_read = data.size(); // TODO(chogan): @optimization Avoid the copy - memcpy(blob->data, data.data(), bytes_read); + u8 *read_dest = (u8 *)blob->data + total_bytes_read; + memcpy(read_dest, data.data(), bytes_read); } } else { bytes_read = LocalReadBufferById(context, id, blob, total_bytes_read); diff --git a/src/metadata_management.cc b/src/metadata_management.cc index 38eb92668..a7a7531e8 100644 --- a/src/metadata_management.cc +++ b/src/metadata_management.cc @@ -184,7 +184,7 @@ void PutId(MetadataManager *mdm, RpcContext *rpc, const std::string &name, if (target_node == rpc->node_id) { LocalPut(mdm, name.c_str(), id, map_type); } else { - RpcCall(rpc, target_node, "RemotePut", name, id, map_type); + RpcCall(rpc, target_node, "RemotePut", name, id, map_type); } } @@ -195,7 +195,7 @@ void DeleteId(MetadataManager *mdm, RpcContext *rpc, const std::string &name, if (target_node == rpc->node_id) { LocalDelete(mdm, name.c_str(), map_type); } else { - RpcCall(rpc, target_node, "RemoteDelete", name, map_type); + RpcCall(rpc, target_node, "RemoteDelete", name, map_type); } } @@ -354,7 +354,7 @@ void AddBlobIdToBucket(MetadataManager *mdm, RpcContext *rpc, BlobID blob_id, if (target_node == rpc->node_id) { LocalAddBlobIdToBucket(mdm, bucket_id, blob_id); } else { - RpcCall(rpc, target_node, "RemoteAddBlobIdToBucket", bucket_id, + RpcCall(rpc, target_node, "RemoteAddBlobIdToBucket", bucket_id, blob_id); } } @@ -366,7 +366,7 @@ void AddBlobIdToVBucket(MetadataManager *mdm, RpcContext *rpc, BlobID blob_id, if (target_node == rpc->node_id) { LocalAddBlobIdToVBucket(mdm, vbucket_id, blob_id); } else { - RpcCall(rpc, target_node, "RemoteAddBlobIdToVBucket", vbucket_id, + RpcCall(rpc, target_node, "RemoteAddBlobIdToVBucket", vbucket_id, blob_id); } } @@ -487,7 +487,7 @@ void FreeBufferIdList(SharedMemoryContext *context, RpcContext *rpc, if (target_node == rpc->node_id) { LocalFreeBufferIdList(context, blob_id); } else { - RpcCall(rpc, target_node, "RemoteFreeBufferIdList", blob_id); + RpcCall(rpc, target_node, "RemoteFreeBufferIdList", blob_id); } } @@ -536,7 +536,7 @@ void RemoveBlobFromBucketInfo(SharedMemoryContext *context, RpcContext *rpc, if (target_node == rpc->node_id) { LocalRemoveBlobFromBucketInfo(context, bucket_id, blob_id); } else { - RpcCall(rpc, target_node, "RemoteRemoveBlobFromBucketInfo", bucket_id, + RpcCall(rpc, target_node, "RemoteRemoveBlobFromBucketInfo", bucket_id, blob_id); } } @@ -550,7 +550,7 @@ void DestroyBlobByName(SharedMemoryContext *context, RpcContext *rpc, if (blob_id_target_node == rpc->node_id) { LocalDestroyBlobByName(context, rpc, blob_name.c_str(), blob_id); } else { - RpcCall(rpc, blob_id_target_node, "RemoteDestroyBlobByName", + RpcCall(rpc, blob_id_target_node, "RemoteDestroyBlobByName", blob_name, blob_id); } RemoveBlobFromBucketInfo(context, rpc, bucket_id, blob_id); @@ -592,7 +592,7 @@ void DestroyBlobById(SharedMemoryContext *context, RpcContext *rpc, BlobID id) { if (target_node == rpc->node_id) { LocalDestroyBlobById(context, rpc, id); } else { - RpcCall(rpc, target_node, "RemoteDestroyBlobById", id); + RpcCall(rpc, target_node, "RemoteDestroyBlobById", id); } } @@ -624,7 +624,7 @@ void RenameBucket(SharedMemoryContext *context, RpcContext *rpc, BucketID id, if (target_node == rpc->node_id) { LocalRenameBucket(context, rpc, id, old_name.c_str(), new_name.c_str()); } else { - RpcCall(rpc, target_node, "RemoteRenameBucket", id, old_name, + RpcCall(rpc, target_node, "RemoteRenameBucket", id, old_name, new_name); } } @@ -641,7 +641,7 @@ void IncrementRefcount(SharedMemoryContext *context, RpcContext *rpc, if (target_node == rpc->node_id) { LocalIncrementRefcount(context, id); } else { - RpcCall(rpc, target_node, "RemoteIncrementRefcount", id); + RpcCall(rpc, target_node, "RemoteIncrementRefcount", id); } } @@ -658,7 +658,7 @@ void DecrementRefcount(SharedMemoryContext *context, RpcContext *rpc, if (target_node == rpc->node_id) { LocalDecrementRefcount(context, id); } else { - RpcCall(rpc, target_node, "RemoteDecrementRefcount", id); + RpcCall(rpc, target_node, "RemoteDecrementRefcount", id); } } @@ -765,7 +765,7 @@ void UpdateGlobalSystemViewState(SharedMemoryContext *context, if (target_node == rpc->node_id) { LocalUpdateGlobalSystemViewState(context, adjustments); } else { - RpcCall(rpc, target_node, "RemoteUpdateGlobalSystemViewState", + RpcCall(rpc, target_node, "RemoteUpdateGlobalSystemViewState", adjustments); } } @@ -954,7 +954,7 @@ void IncrementRefcount(SharedMemoryContext *context, RpcContext *rpc, if (target_node == rpc->node_id) { LocalIncrementRefcount(context, id); } else { - RpcCall(rpc, target_node, "RemoteIncrementRefcountVBucket", id); + RpcCall(rpc, target_node, "RemoteIncrementRefcountVBucket", id); } } @@ -971,7 +971,7 @@ void DecrementRefcount(SharedMemoryContext *context, RpcContext *rpc, if (target_node == rpc->node_id) { LocalDecrementRefcount(context, id); } else { - RpcCall(rpc, target_node, "RemoteDecrementRefcountVBucket", id); + RpcCall(rpc, target_node, "RemoteDecrementRefcountVBucket", id); } } diff --git a/src/metadata_management.h b/src/metadata_management.h index 801b6ace4..3cf54d96e 100644 --- a/src/metadata_management.h +++ b/src/metadata_management.h @@ -284,6 +284,12 @@ void IncrementRefcount(SharedMemoryContext *context, RpcContext *rpc, */ void DecrementRefcount(SharedMemoryContext *context, RpcContext *rpc, VBucketID id); + +/** + * + */ +bool IsNullBlobId(BlobID id); + } // namespace hermes #endif // HERMES_METADATA_MANAGEMENT_H_ diff --git a/src/rpc_thallium.cc b/src/rpc_thallium.cc index 5fef02567..3297e0e5f 100644 --- a/src/rpc_thallium.cc +++ b/src/rpc_thallium.cc @@ -77,8 +77,8 @@ void ThalliumStartRpcServer(SharedMemoryContext *context, RpcContext *rpc, function rpc_release_buffer = [context](const request &req, BufferID id) { - (void)req; LocalReleaseBuffer(context, id); + req.respond(true); }; function rpc_split_buffers = @@ -180,31 +180,32 @@ void ThalliumStartRpcServer(SharedMemoryContext *context, RpcContext *rpc, function rpc_map_put = [context](const request &req, const string &name, u64 val, const MapType &map_type) { - (void)req; MetadataManager *mdm = GetMetadataManagerFromContext(context); LocalPut(mdm, name.c_str(), val, map_type); + req.respond(true); }; function rpc_map_delete = [context](const request &req, string name, const MapType &map_type) { - (void)req; MetadataManager *mdm = GetMetadataManagerFromContext(context); LocalDelete(mdm, name.c_str(), map_type); + req.respond(true); }; function rpc_add_blob_bucket = [context](const request &req, BucketID bucket_id, BlobID blob_id) { - (void)req; MetadataManager *mdm = GetMetadataManagerFromContext(context); LocalAddBlobIdToBucket(mdm, bucket_id, blob_id); + req.respond(true); }; function rpc_add_blob_vbucket = [context](const request &req, VBucketID vbucket_id, BlobID blob_id) { - (void)req; MetadataManager *mdm = GetMetadataManagerFromContext(context); LocalAddBlobIdToVBucket(mdm, vbucket_id, blob_id); + req.respond(true); }; + function rpc_get_buffer_id_list = [context](const request &req, BlobID blob_id) { MetadataManager *mdm = GetMetadataManagerFromContext(context); @@ -215,8 +216,8 @@ void ThalliumStartRpcServer(SharedMemoryContext *context, RpcContext *rpc, function rpc_free_buffer_id_list = [context](const request &req, BlobID blob_id) { - (void)req; LocalFreeBufferIdList(context, blob_id); + req.respond(true); }; function rpc_destroy_bucket = @@ -247,21 +248,21 @@ void ThalliumStartRpcServer(SharedMemoryContext *context, RpcContext *rpc, rpc_rename_bucket = [context, rpc](const request &req, BucketID id, const string &old_name, const string &new_name) { - (void)req; LocalRenameBucket(context, rpc, id, old_name.c_str(), new_name.c_str()); + req.respond(true); }; function rpc_destroy_blob_by_name = [context, rpc](const request &req, const string &name, BlobID id) { - (void)req; LocalDestroyBlobByName(context, rpc, name.c_str(), id); + req.respond(true); }; function rpc_destroy_blob_by_id = [context, rpc](const request &req, BlobID id) { - (void)req; LocalDestroyBlobById(context, rpc, id); + req.respond(true); }; function rpc_contains_blob = @@ -274,32 +275,32 @@ void ThalliumStartRpcServer(SharedMemoryContext *context, RpcContext *rpc, rpc_remove_blob_from_bucket_info = [context](const request &req, BucketID bucket_id, BlobID blob_id) { - (void)req; LocalRemoveBlobFromBucketInfo(context, bucket_id, blob_id); + req.respond(true); }; function rpc_increment_refcount_bucket = [context](const request &req, BucketID id) { - (void)req; LocalIncrementRefcount(context, id); + req.respond(true); }; function rpc_decrement_refcount_bucket = [context](const request &req, BucketID id) { - (void)req; LocalDecrementRefcount(context, id); + req.respond(true); }; function rpc_increment_refcount_vbucket = [context](const request &req, VBucketID id) { - (void)req; LocalIncrementRefcount(context, id); + req.respond(true); }; function rpc_decrement_refcount_vbucket = [context](const request &req, VBucketID id) { - (void)req; LocalDecrementRefcount(context, id); + req.respond(true); }; function rpc_get_global_device_capacities = @@ -321,8 +322,8 @@ void ThalliumStartRpcServer(SharedMemoryContext *context, RpcContext *rpc, function)> rpc_update_global_system_view_state = [context](const request &req, std::vector adjustments) { - (void)req; LocalUpdateGlobalSystemViewState(context, adjustments); + req.respond(true); }; function rpc_get_blob_ids = @@ -345,8 +346,7 @@ void ThalliumStartRpcServer(SharedMemoryContext *context, RpcContext *rpc, rpc_server->define("MergeBuffers", rpc_merge_buffers).disable_response(); // - rpc_server->define("RemoteReleaseBuffer", - rpc_release_buffer).disable_response(); + rpc_server->define("RemoteReleaseBuffer", rpc_release_buffer); rpc_server->define("RemoteGetBufferSize", rpc_get_buffer_size); rpc_server->define("RemoteReadBufferById", rpc_read_buffer_by_id); @@ -354,40 +354,30 @@ void ThalliumStartRpcServer(SharedMemoryContext *context, RpcContext *rpc, rpc_server->define("RemoteBulkReadBufferById", rpc_bulk_read_buffer_by_id); rpc_server->define("RemoteGet", rpc_map_get); - rpc_server->define("RemotePut", rpc_map_put).disable_response(); - rpc_server->define("RemoteDelete", rpc_map_delete).disable_response(); - rpc_server->define("RemoteAddBlobIdToBucket", - rpc_add_blob_bucket).disable_response(); - rpc_server->define("RemoteAddBlobIdToVBucket", - rpc_add_blob_vbucket).disable_response(); + rpc_server->define("RemotePut", rpc_map_put); + rpc_server->define("RemoteDelete", rpc_map_delete); + rpc_server->define("RemoteAddBlobIdToBucket", rpc_add_blob_bucket); + rpc_server->define("RemoteAddBlobIdToVBucket", rpc_add_blob_vbucket); rpc_server->define("RemoteDestroyBucket", rpc_destroy_bucket); - rpc_server->define("RemoteRenameBucket", - rpc_rename_bucket).disable_response(); - rpc_server->define("RemoteDestroyBlobByName", - rpc_destroy_blob_by_name).disable_response(); - rpc_server->define("RemoteDestroyBlobById", - rpc_destroy_blob_by_id).disable_response(); + rpc_server->define("RemoteRenameBucket", rpc_rename_bucket); + rpc_server->define("RemoteDestroyBlobByName", rpc_destroy_blob_by_name); + rpc_server->define("RemoteDestroyBlobById", rpc_destroy_blob_by_id); rpc_server->define("RemoteContainsBlob", rpc_contains_blob); rpc_server->define("RemoteGetNextFreeBucketId", rpc_get_next_free_bucket_id); rpc_server->define("RemoteRemoveBlobFromBucketInfo", - rpc_remove_blob_from_bucket_info).disable_response(); + rpc_remove_blob_from_bucket_info); rpc_server->define("RemoteAllocateBufferIdList", rpc_allocate_buffer_id_list); rpc_server->define("RemoteGetBufferIdList", rpc_get_buffer_id_list); - rpc_server->define("RemoteFreeBufferIdList", - rpc_free_buffer_id_list).disable_response(); - rpc_server->define("RemoteIncrementRefcount", - rpc_increment_refcount_bucket).disable_response(); - rpc_server->define("RemoteDecrementRefcount", - rpc_decrement_refcount_bucket).disable_response(); - rpc_server - ->define("RemoteIncrementRefcountVBucket", - rpc_increment_refcount_vbucket).disable_response(); - rpc_server - ->define("RemoteDecrementRefcountVBucket", - rpc_decrement_refcount_vbucket).disable_response(); + rpc_server->define("RemoteFreeBufferIdList", rpc_free_buffer_id_list); + rpc_server->define("RemoteIncrementRefcount", rpc_increment_refcount_bucket); + rpc_server->define("RemoteDecrementRefcount", rpc_decrement_refcount_bucket); + rpc_server->define("RemoteIncrementRefcountVBucket", + rpc_increment_refcount_vbucket); + rpc_server->define("RemoteDecrementRefcountVBucket", + rpc_decrement_refcount_vbucket); rpc_server->define("RemoteGetRemainingCapacity", rpc_get_remaining_capacity); rpc_server->define("RemoteUpdateGlobalSystemViewState", - rpc_update_global_system_view_state).disable_response(); + rpc_update_global_system_view_state); rpc_server->define("RemoteGetGlobalDeviceCapacities", rpc_get_global_device_capacities); rpc_server->define("RemoteGetBlobIds", rpc_get_blob_ids); @@ -540,7 +530,7 @@ void InitRpcClients(RpcContext *rpc) { (ClientThalliumState *)malloc(sizeof(ClientThalliumState)); std::string protocol = GetProtocol(rpc); // TODO(chogan): This should go in a per-client persistent arena - state->engine = new tl::engine(protocol, THALLIUM_CLIENT_MODE, true); + state->engine = new tl::engine(protocol, THALLIUM_CLIENT_MODE, true, 1); rpc->client_rpc.state = state; } @@ -550,8 +540,10 @@ void ShutdownRpcClients(RpcContext *rpc) { if (state) { if (state->engine) { delete state->engine; + state->engine = 0; } free(state); + state = 0; } }