Skip to content

Commit

Permalink
Merge b2af090 into e431d31
Browse files Browse the repository at this point in the history
  • Loading branch information
ChristopherHogan committed Feb 4, 2021
2 parents e431d31 + b2af090 commit 2908518
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 69 deletions.
6 changes: 4 additions & 2 deletions src/api/bucket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,10 @@ size_t Bucket::GetBlobSize(Arena *arena, const std::string &name,
<< name_ << '\n';
BlobID blob_id = GetBlobIdByName(&hermes_->context_, &hermes_->rpc_,
name.c_str());
result = GetBlobSizeById(&hermes_->context_, &hermes_->rpc_, arena,
blob_id);
if (!IsNullBlobId(blob_id)) {
result = GetBlobSizeById(&hermes_->context_, &hermes_->rpc_, arena,
blob_id);
}
}

return result;
Expand Down
4 changes: 1 addition & 3 deletions src/api/hermes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -294,9 +294,7 @@ std::shared_ptr<api::Hermes> InitHermes(Config *config, bool is_daemon,
api::Context::default_buffer_organizer_retries =
config->num_buffer_organizer_retries;

if (comm.proc_kind == ProcessKind::kApp) {
InitRpcClients(&result->rpc_);
}
InitRpcClients(&result->rpc_);

return result;
}
Expand Down
14 changes: 8 additions & 6 deletions src/buffer_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,9 @@ void Finalize(SharedMemoryContext *context, CommunicationContext *comm,
RpcContext *rpc, const char *shmem_name, Arena *trans_arena,
bool is_application_core, bool force_rpc_shutdown) {
WorldBarrier(comm);
ShutdownRpcClients(rpc);

if (is_application_core) {
ShutdownRpcClients(rpc);
ReleaseSharedMemoryContext(context);
HERMES_DEBUG_CLIENT_CLOSE();
}
Expand Down Expand Up @@ -491,7 +492,7 @@ void ReleaseBuffer(SharedMemoryContext *context, RpcContext *rpc,
if (target_node == rpc->node_id) {
LocalReleaseBuffer(context, buffer_id);
} else {
RpcCall<void>(rpc, target_node, "RemoteReleaseBuffer", buffer_id);
RpcCall<bool>(rpc, target_node, "RemoteReleaseBuffer", buffer_id);
}
}

Expand Down Expand Up @@ -1474,12 +1475,13 @@ size_t ReadBlobFromBuffers(SharedMemoryContext *context, RpcContext *rpc,
assert(bytes_transferred == buffer_sizes[i]);
bytes_read += bytes_transferred;
} else {
std::vector<u8> data = RpcCall<std::vector<u8>>(rpc, id.bits.node_id,
"RemoteReadBufferById",
id, buffer_sizes[i]);
std::vector<u8> data =
RpcCall<std::vector<u8>>(rpc, id.bits.node_id, "RemoteReadBufferById",
id);
bytes_read = data.size();
// TODO(chogan): @optimization Avoid the copy
memcpy(blob->data, data.data(), bytes_read);
u8 *read_dest = (u8 *)blob->data + total_bytes_read;
memcpy(read_dest, data.data(), bytes_read);
}
} else {
bytes_read = LocalReadBufferById(context, id, blob, total_bytes_read);
Expand Down
28 changes: 14 additions & 14 deletions src/metadata_management.cc
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ void PutId(MetadataManager *mdm, RpcContext *rpc, const std::string &name,
if (target_node == rpc->node_id) {
LocalPut(mdm, name.c_str(), id, map_type);
} else {
RpcCall<void>(rpc, target_node, "RemotePut", name, id, map_type);
RpcCall<bool>(rpc, target_node, "RemotePut", name, id, map_type);
}
}

Expand All @@ -195,7 +195,7 @@ void DeleteId(MetadataManager *mdm, RpcContext *rpc, const std::string &name,
if (target_node == rpc->node_id) {
LocalDelete(mdm, name.c_str(), map_type);
} else {
RpcCall<void>(rpc, target_node, "RemoteDelete", name, map_type);
RpcCall<bool>(rpc, target_node, "RemoteDelete", name, map_type);
}
}

Expand Down Expand Up @@ -354,7 +354,7 @@ void AddBlobIdToBucket(MetadataManager *mdm, RpcContext *rpc, BlobID blob_id,
if (target_node == rpc->node_id) {
LocalAddBlobIdToBucket(mdm, bucket_id, blob_id);
} else {
RpcCall<void>(rpc, target_node, "RemoteAddBlobIdToBucket", bucket_id,
RpcCall<bool>(rpc, target_node, "RemoteAddBlobIdToBucket", bucket_id,
blob_id);
}
}
Expand All @@ -366,7 +366,7 @@ void AddBlobIdToVBucket(MetadataManager *mdm, RpcContext *rpc, BlobID blob_id,
if (target_node == rpc->node_id) {
LocalAddBlobIdToVBucket(mdm, vbucket_id, blob_id);
} else {
RpcCall<void>(rpc, target_node, "RemoteAddBlobIdToVBucket", vbucket_id,
RpcCall<bool>(rpc, target_node, "RemoteAddBlobIdToVBucket", vbucket_id,
blob_id);
}
}
Expand Down Expand Up @@ -487,7 +487,7 @@ void FreeBufferIdList(SharedMemoryContext *context, RpcContext *rpc,
if (target_node == rpc->node_id) {
LocalFreeBufferIdList(context, blob_id);
} else {
RpcCall<void>(rpc, target_node, "RemoteFreeBufferIdList", blob_id);
RpcCall<bool>(rpc, target_node, "RemoteFreeBufferIdList", blob_id);
}
}

Expand Down Expand Up @@ -536,7 +536,7 @@ void RemoveBlobFromBucketInfo(SharedMemoryContext *context, RpcContext *rpc,
if (target_node == rpc->node_id) {
LocalRemoveBlobFromBucketInfo(context, bucket_id, blob_id);
} else {
RpcCall<void>(rpc, target_node, "RemoteRemoveBlobFromBucketInfo", bucket_id,
RpcCall<bool>(rpc, target_node, "RemoteRemoveBlobFromBucketInfo", bucket_id,
blob_id);
}
}
Expand All @@ -550,7 +550,7 @@ void DestroyBlobByName(SharedMemoryContext *context, RpcContext *rpc,
if (blob_id_target_node == rpc->node_id) {
LocalDestroyBlobByName(context, rpc, blob_name.c_str(), blob_id);
} else {
RpcCall<void>(rpc, blob_id_target_node, "RemoteDestroyBlobByName",
RpcCall<bool>(rpc, blob_id_target_node, "RemoteDestroyBlobByName",
blob_name, blob_id);
}
RemoveBlobFromBucketInfo(context, rpc, bucket_id, blob_id);
Expand Down Expand Up @@ -592,7 +592,7 @@ void DestroyBlobById(SharedMemoryContext *context, RpcContext *rpc, BlobID id) {
if (target_node == rpc->node_id) {
LocalDestroyBlobById(context, rpc, id);
} else {
RpcCall<void>(rpc, target_node, "RemoteDestroyBlobById", id);
RpcCall<bool>(rpc, target_node, "RemoteDestroyBlobById", id);
}
}

Expand Down Expand Up @@ -624,7 +624,7 @@ void RenameBucket(SharedMemoryContext *context, RpcContext *rpc, BucketID id,
if (target_node == rpc->node_id) {
LocalRenameBucket(context, rpc, id, old_name.c_str(), new_name.c_str());
} else {
RpcCall<void>(rpc, target_node, "RemoteRenameBucket", id, old_name,
RpcCall<bool>(rpc, target_node, "RemoteRenameBucket", id, old_name,
new_name);
}
}
Expand All @@ -641,7 +641,7 @@ void IncrementRefcount(SharedMemoryContext *context, RpcContext *rpc,
if (target_node == rpc->node_id) {
LocalIncrementRefcount(context, id);
} else {
RpcCall<void>(rpc, target_node, "RemoteIncrementRefcount", id);
RpcCall<bool>(rpc, target_node, "RemoteIncrementRefcount", id);
}
}

Expand All @@ -658,7 +658,7 @@ void DecrementRefcount(SharedMemoryContext *context, RpcContext *rpc,
if (target_node == rpc->node_id) {
LocalDecrementRefcount(context, id);
} else {
RpcCall<void>(rpc, target_node, "RemoteDecrementRefcount", id);
RpcCall<bool>(rpc, target_node, "RemoteDecrementRefcount", id);
}
}

Expand Down Expand Up @@ -765,7 +765,7 @@ void UpdateGlobalSystemViewState(SharedMemoryContext *context,
if (target_node == rpc->node_id) {
LocalUpdateGlobalSystemViewState(context, adjustments);
} else {
RpcCall<void>(rpc, target_node, "RemoteUpdateGlobalSystemViewState",
RpcCall<bool>(rpc, target_node, "RemoteUpdateGlobalSystemViewState",
adjustments);
}
}
Expand Down Expand Up @@ -954,7 +954,7 @@ void IncrementRefcount(SharedMemoryContext *context, RpcContext *rpc,
if (target_node == rpc->node_id) {
LocalIncrementRefcount(context, id);
} else {
RpcCall<void>(rpc, target_node, "RemoteIncrementRefcountVBucket", id);
RpcCall<bool>(rpc, target_node, "RemoteIncrementRefcountVBucket", id);
}
}

Expand All @@ -971,7 +971,7 @@ void DecrementRefcount(SharedMemoryContext *context, RpcContext *rpc,
if (target_node == rpc->node_id) {
LocalDecrementRefcount(context, id);
} else {
RpcCall<void>(rpc, target_node, "RemoteDecrementRefcountVBucket", id);
RpcCall<bool>(rpc, target_node, "RemoteDecrementRefcountVBucket", id);
}
}

Expand Down
6 changes: 6 additions & 0 deletions src/metadata_management.h
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,12 @@ void IncrementRefcount(SharedMemoryContext *context, RpcContext *rpc,
*/
void DecrementRefcount(SharedMemoryContext *context, RpcContext *rpc,
VBucketID id);

/**
*
*/
bool IsNullBlobId(BlobID id);

} // namespace hermes

#endif // HERMES_METADATA_MANAGEMENT_H_

0 comments on commit 2908518

Please sign in to comment.