diff --git a/adapter/src/hermes/adapter/stdio/mapper/balanced_mapper.cc b/adapter/src/hermes/adapter/stdio/mapper/balanced_mapper.cc index 506b15947..9fe5099e4 100644 --- a/adapter/src/hermes/adapter/stdio/mapper/balanced_mapper.cc +++ b/adapter/src/hermes/adapter/stdio/mapper/balanced_mapper.cc @@ -38,7 +38,7 @@ MapperReturnType BalancedMapper::map(const FileStruct& file_op) { : file_op.size_ - size_mapped; file.size_ = hermes.size_; - hermes.blob_name_ = std::to_string(page_index); + hermes.blob_name_ = std::to_string(page_index + 1); mapper_return.emplace_back(file, hermes); size_mapped += hermes.size_; } diff --git a/adapter/src/hermes/adapter/stdio/stdio.cc b/adapter/src/hermes/adapter/stdio/stdio.cc index 9e0d7c3b7..9c59a0b6d 100644 --- a/adapter/src/hermes/adapter/stdio/stdio.cc +++ b/adapter/src/hermes/adapter/stdio/stdio.cc @@ -193,8 +193,7 @@ size_t write_internal(std::pair &existing, const void *ptr, << std::endl; for (const auto &item : mapping) { hapi::Context ctx; - size_t pos = item.second.blob_name_.find(kStringDelimiter) + 1; - auto index = std::stol(item.second.blob_name_.substr(pos)); + auto index = std::stol(item.second.blob_name_) - 1; auto blob_exists = existing.first.st_bkid->ContainsBlob(item.second.blob_name_); unsigned char *put_data_ptr = (unsigned char *)ptr + data_offset; @@ -556,7 +555,7 @@ int HERMES_DECL(fflush)(FILE *fp) { auto offset_map = std::unordered_map(); for (const auto &blob_name : blob_names) { file_vbucket.Link(blob_name, filename, ctx); - auto page_index = std::stol(blob_name); + auto page_index = std::stol(blob_name) - 1; offset_map.emplace(blob_name, page_index * kPageSize); } auto trait = hermes::api::FileMappingTrait(filename, offset_map, @@ -600,7 +599,7 @@ int HERMES_DECL(fclose)(FILE *fp) { for (const auto &blob_name : blob_names) { auto status = file_vbucket.Link(blob_name, filename, ctx); if (!status.Failed()) { - auto page_index = std::stol(blob_name); + auto page_index = std::stol(blob_name) - 1; offset_map.emplace(blob_name, page_index * kPageSize); } } diff --git a/src/api/vbucket.cc b/src/api/vbucket.cc index faf4e4046..ca6161533 100644 --- a/src/api/vbucket.cc +++ b/src/api/vbucket.cc @@ -23,9 +23,7 @@ namespace hermes { namespace api { -bool VBucket::IsValid() const { - return !IsNullVBucketId(id_); -} +bool VBucket::IsValid() const { return !IsNullVBucketId(id_); } Status VBucket::Link(std::string blob_name, std::string bucket_name) { Status result = Link(blob_name, bucket_name, ctx_); @@ -44,7 +42,6 @@ Status VBucket::Link(std::string blob_name, std::string bucket_name, bool blob_exists = hermes_->BucketContainsBlob(bucket_name, blob_name); if (blob_exists) { // inserting value by insert function - linked_blobs_.push_back(make_pair(bucket_name, blob_name)); TraitInput input; input.bucket_name = bucket_name; input.blob_name = blob_name; @@ -54,6 +51,8 @@ Status VBucket::Link(std::string blob_name, std::string bucket_name, // TODO(hari): @errorhandling Check if linking was successful } } + AttachBlobToVBucket(&hermes_->context_, &hermes_->rpc_, blob_name.data(), + bucket_name.data(), id_); } else { ret = BLOB_NOT_IN_BUCKET; LOG(ERROR) << ret.Msg(); @@ -76,8 +75,14 @@ Status VBucket::Unlink(std::string blob_name, std::string bucket_name, LOG(INFO) << "Unlinking blob " << blob_name << " in bucket " << bucket_name << " from VBucket " << name_ << '\n'; bool found = false; - for (auto ci = linked_blobs_.begin(); ci != linked_blobs_.end(); ++ci) { - if (ci->first == bucket_name && ci->second == blob_name) { + auto blob_ids = + GetBlobsFromVBucketInfo(&hermes_->context_, &hermes_->rpc_, id_); + auto bucket_id = + GetBucketId(&hermes_->context_, &hermes_->rpc_, bucket_name.c_str()); + auto selected_blob_id = GetBlobId(&hermes_->context_, &hermes_->rpc_, + blob_name.c_str(), bucket_id); + for (hermes::BlobID& blob_id : blob_ids) { + if (selected_blob_id.as_int == blob_id.as_int) { TraitInput input; input.bucket_name = bucket_name; input.blob_name = blob_name; @@ -87,7 +92,8 @@ Status VBucket::Unlink(std::string blob_name, std::string bucket_name, // TODO(hari): @errorhandling Check if unlinking was successful } } - linked_blobs_.erase(ci); + RemoveBlobFromVBucketInfo(&hermes_->context_, &hermes_->rpc_, id_, + blob_name.c_str(), bucket_name.c_str()); found = true; break; } @@ -107,10 +113,14 @@ bool VBucket::ContainsBlob(std::string blob_name, std::string bucket_name) { LOG(INFO) << "Checking if blob " << blob_name << " from bucket " << bucket_name << " is in this VBucket " << name_ << '\n'; - for (auto ci = linked_blobs_.begin(); ci != linked_blobs_.end(); ++ci) { - bk_tmp = ci->first; - blob_tmp = ci->second; - if (bk_tmp == bucket_name && blob_tmp == blob_name) ret = true; + auto blob_ids = + GetBlobsFromVBucketInfo(&hermes_->context_, &hermes_->rpc_, id_); + auto bucket_id = + GetBucketId(&hermes_->context_, &hermes_->rpc_, bucket_name.c_str()); + auto selected_blob_id = GetBlobId(&hermes_->context_, &hermes_->rpc_, + blob_name.c_str(), bucket_id); + for (const auto& blob_id : blob_ids) { + if (selected_blob_id.as_int == blob_id.as_int) ret = true; } return ret; @@ -128,12 +138,14 @@ Blob& VBucket::GetBlob(std::string blob_name, std::string bucket_name) { return local_blob; } -template -std::vector VBucket::GetLinks(Predicate pred, Context& ctx) { +std::vector VBucket::GetLinks(Context& ctx) { + (void)ctx; LOG(INFO) << "Getting subset of links satisfying pred in VBucket " << name_ << '\n'; - - return std::vector(); + auto blob_ids = + GetBlobsFromVBucketInfo(&hermes_->context_, &hermes_->rpc_, id_); + // TODO(hari): add filtering + return blob_ids; } Status VBucket::Attach(Trait* trait) { @@ -156,11 +168,17 @@ Status VBucket::Attach(Trait* trait, Context& ctx) { } } if (!selected_trait) { - for (auto ci = linked_blobs_.begin(); ci != linked_blobs_.end(); ++ci) { + auto blob_ids = + GetBlobsFromVBucketInfo(&hermes_->context_, &hermes_->rpc_, id_); + for (const auto& blob_id : blob_ids) { Trait* t = static_cast(trait); TraitInput input; - input.bucket_name = ci->first; - input.blob_name = ci->second; + auto bucket_id = + GetBucketIdFromBlobId(&hermes_->context_, &hermes_->rpc_, blob_id); + input.bucket_name = + GetBucketNameById(&hermes_->context_, &hermes_->rpc_, bucket_id); + input.blob_name = + GetBlobNameFromId(&hermes_->context_, &hermes_->rpc_, blob_id); if (t->onAttachFn != nullptr) { t->onAttachFn(input, trait); // TODO(hari): @errorhandling Check if attach was successful @@ -201,11 +219,17 @@ Status VBucket::Detach(Trait* trait, Context& ctx) { } } if (selected_trait) { - for (auto ci = linked_blobs_.begin(); ci != linked_blobs_.end(); ++ci) { + auto blob_ids = + GetBlobsFromVBucketInfo(&hermes_->context_, &hermes_->rpc_, id_); + for (const auto& blob_id : blob_ids) { Trait* t = static_cast(trait); TraitInput input; - input.bucket_name = ci->first; - input.blob_name = ci->second; + auto bucket_id = + GetBucketIdFromBlobId(&hermes_->context_, &hermes_->rpc_, blob_id); + input.bucket_name = + GetBucketNameById(&hermes_->context_, &hermes_->rpc_, bucket_id); + input.blob_name = + GetBlobNameFromId(&hermes_->context_, &hermes_->rpc_, blob_id); if (t->onDetachFn != nullptr) { t->onDetachFn(input, trait); // TODO(hari): @errorhandling Check if detach was successful @@ -264,12 +288,17 @@ Status VBucket::Delete(Context& ctx) { } } } - for (auto ci = linked_blobs_.begin(); ci != linked_blobs_.end(); ++ci) { + auto blob_ids = + GetBlobsFromVBucketInfo(&hermes_->context_, &hermes_->rpc_, id_); + for (const auto& blob_id : blob_ids) { + TraitInput input; + auto bucket_id = + GetBucketIdFromBlobId(&hermes_->context_, &hermes_->rpc_, blob_id); + input.bucket_name = + GetBucketNameById(&hermes_->context_, &hermes_->rpc_, bucket_id); + input.blob_name = + GetBlobNameFromId(&hermes_->context_, &hermes_->rpc_, blob_id); if (attached_traits_.size() > 0) { - TraitInput input; - input.bucket_name = ci->first; - input.blob_name = ci->second; - if (this->persist) { if (t->type == TraitType::FILE_MAPPING) { FileMappingTrait* fileBackedTrait = (FileMappingTrait*)t; @@ -278,20 +307,13 @@ Status VBucket::Delete(Context& ctx) { fileBackedTrait->flush_cb(input, fileBackedTrait); } else { if (!fileBackedTrait->offset_map.empty()) { - auto iter = fileBackedTrait->offset_map.find(ci->second); + auto iter = fileBackedTrait->offset_map.find(input.blob_name); if (iter != fileBackedTrait->offset_map.end()) { - BucketID bucket_id = - GetBucketId(&hermes_->context_, &hermes_->rpc_, - ci->first.c_str()); - auto blob_id = - GetBlobId(&hermes_->context_, &hermes_->rpc_, ci->second, - bucket_id); + // TODO(hari): @errorhandling check return of StdIoPersistBlob ret = StdIoPersistBlob(&hermes_->context_, &hermes_->rpc_, &hermes_->trans_arena_, blob_id, file, iter->second); - if (!ret.Succeeded()) { - LOG(ERROR) << ret.Msg(); - } + if (!ret.Succeeded()) LOG(ERROR) << ret.Msg(); } else { ret = BLOB_NOT_LINKED_IN_MAP; LOG(ERROR) << ret.Msg(); @@ -313,6 +335,9 @@ Status VBucket::Delete(Context& ctx) { // TODO(hari): @errorhandling Check if unlinking was successful } } + RemoveBlobFromVBucketInfo(&hermes_->context_, &hermes_->rpc_, id_, + input.blob_name.c_str(), + input.bucket_name.c_str()); } if (persist) { if (file != nullptr) { @@ -324,7 +349,6 @@ Status VBucket::Delete(Context& ctx) { } } } - linked_blobs_.clear(); attached_traits_.clear(); return Status(); } diff --git a/src/api/vbucket.h b/src/api/vbucket.h index 8370f9c73..8af6ff305 100644 --- a/src/api/vbucket.h +++ b/src/api/vbucket.h @@ -31,7 +31,6 @@ class VBucket { private: std::string name_; VBucketID id_; - std::list> linked_blobs_; std::list attached_traits_; Blob local_blob; bool persist; @@ -44,8 +43,7 @@ class VBucket { VBucket(std::string initial_name, std::shared_ptr const &h, bool persist, Context ctx = Context()) : name_(initial_name), - id_({0, 0}), - linked_blobs_(), + id_({{0, 0}}), attached_traits_(), local_blob(), persist(persist), @@ -66,7 +64,6 @@ class VBucket { ~VBucket() { name_.clear(); - linked_blobs_.clear(); } bool IsValid() const; @@ -88,10 +85,9 @@ class VBucket { /** get a blob linked to this vbucket */ Blob &GetBlob(std::string blob_name, std::string bucket_name); - /** retrieves the subset of links satisfying pred */ + /** retrieves the subset of blob links satisfying pred */ /** could return iterator */ - template - std::vector GetLinks(Predicate pred, Context &ctx); + std::vector GetLinks(Context &ctx); /** attach a trait to this vbucket */ Status Attach(Trait *trait, Context &ctx); diff --git a/src/metadata_management.cc b/src/metadata_management.cc index 4d1c62c70..5c2bb5b51 100644 --- a/src/metadata_management.cc +++ b/src/metadata_management.cc @@ -1200,4 +1200,57 @@ GetRemainingTargetCapacities(SharedMemoryContext *context, RpcContext *rpc, return result; } +void AttachBlobToVBucket(SharedMemoryContext *context, RpcContext *rpc, + const char *blob_name, const char *bucket_name, + VBucketID vbucket_id) { + MetadataManager *mdm = GetMetadataManagerFromContext(context); + BucketID bucket_id = GetBucketId(context, rpc, bucket_name); + BlobID blob_id = GetBlobId(context, rpc, blob_name, bucket_id); + AddBlobIdToVBucket(mdm, rpc, blob_id, vbucket_id); +} + +std::string LocalGetBucketNameById(SharedMemoryContext *context, + BucketID blob_id) { + MetadataManager *mdm = GetMetadataManagerFromContext(context); + std::string bucket_name = + ReverseGetFromStorage(mdm, blob_id.as_int, kMapType_Bucket); + return bucket_name; +} + +std::vector GetBlobsFromVBucketInfo(SharedMemoryContext *context, + RpcContext *rpc, + VBucketID vbucket_id) { + u32 target_node = vbucket_id.bits.node_id; + if (target_node == rpc->node_id) { + return LocalGetBlobsFromVBucketInfo(context, vbucket_id); + } else { + return RpcCall>( + rpc, target_node, "RemoteGetBlobsFromVBucketInfo", vbucket_id); + } +} + +void RemoveBlobFromVBucketInfo(SharedMemoryContext *context, RpcContext *rpc, + VBucketID vbucket_id, const char *blob_name, + const char *bucket_name) { + BucketID bucket_id = GetBucketId(context, rpc, bucket_name); + BlobID blob_id = GetBlobId(context, rpc, blob_name, bucket_id); + u32 target_node = vbucket_id.bits.node_id; + if (target_node == rpc->node_id) { + LocalRemoveBlobFromVBucketInfo(context, vbucket_id, blob_id); + } else { + RpcCall(rpc, target_node, "RemoteRemoveBlobFromVBucketInfo", + vbucket_id, blob_id); + } +} + +std::string GetBucketNameById(SharedMemoryContext *context, RpcContext *rpc, + BucketID id) { + auto target_node = id.bits.node_id; + if (target_node == rpc->node_id) { + return LocalGetBucketNameById(context, id); + } else { + return RpcCall(rpc, target_node, "RemoteGetBucketNameById", + id); + } +} } // namespace hermes diff --git a/src/metadata_management.h b/src/metadata_management.h index ad9c31ed3..cd4d38853 100644 --- a/src/metadata_management.h +++ b/src/metadata_management.h @@ -197,7 +197,7 @@ BlobID GetBlobId(SharedMemoryContext *context, RpcContext *rpc, /** * */ -std::string GetBlobNameFromId(MetadataManager *mdm, RpcContext *rpc, +std::string GetBlobNameFromId(SharedMemoryContext *context, RpcContext *rpc, BlobID blob_id); /** @@ -336,6 +336,32 @@ void LocalBeginGlobalTicketMutex(MetadataManager *mdm); */ void LocalEndGlobalTicketMutex(MetadataManager *mdm); +/** + * + */ +void AttachBlobToVBucket(SharedMemoryContext *context, RpcContext *rpc, + const char *blob_name, const char *bucket_name, + VBucketID vbucket_id); + +/** + * + */ +void RemoveBlobFromVBucketInfo(SharedMemoryContext *context, RpcContext *rpc, + VBucketID vbucket_id, const char *blob_name, + const char *bucket_name); + +/** + * + */ +std::vector GetBlobsFromVBucketInfo(SharedMemoryContext *context, + RpcContext *rpc, + VBucketID vbucket_id); + +/** + * + */ +std::string GetBucketNameById(SharedMemoryContext *context, RpcContext *rpc, + BucketID id); } // namespace hermes #endif // HERMES_METADATA_MANAGEMENT_H_ diff --git a/src/metadata_management_internal.h b/src/metadata_management_internal.h index b4f1b0f33..48c9bb213 100644 --- a/src/metadata_management_internal.h +++ b/src/metadata_management_internal.h @@ -125,5 +125,12 @@ VBucketID LocalGetOrCreateVBucketId(SharedMemoryContext *context, u64 HexStringToU64(const std::string &s); std::string MakeInternalBlobName(const std::string &name, BucketID id); +void LocalRemoveBlobFromVBucketInfo(SharedMemoryContext *context, + VBucketID vbucket_id, BlobID blob_id); +std::vector LocalGetBlobsFromVBucketInfo(SharedMemoryContext *context, + VBucketID vbucket_id); +std::string LocalGetBucketNameById(SharedMemoryContext *context, + BucketID blob_id); + } // namespace hermes #endif // HERMES_METADATA_MANAGEMENT_INTERNAL_H_ diff --git a/src/metadata_storage_stb_ds.cc b/src/metadata_storage_stb_ds.cc index ba54842d1..90e8ee182 100644 --- a/src/metadata_storage_stb_ds.cc +++ b/src/metadata_storage_stb_ds.cc @@ -699,4 +699,34 @@ void InitMetadataStorage(SharedMemoryContext *context, MetadataManager *mdm, mdm->blob_map_offset = GetOffsetFromMdm(mdm, blob_map); } +std::vector LocalGetBlobsFromVBucketInfo(SharedMemoryContext *context, + VBucketID vbucket_id) { + MetadataManager *mdm = GetMetadataManagerFromContext(context); + BeginTicketMutex(&mdm->vbucket_mutex); + VBucketInfo *info = LocalGetVBucketInfoById(mdm, vbucket_id); + ChunkedIdList *blobs = &info->blobs; + BlobID *blobs_arr = (BlobID *)GetIdsPtr(mdm, *blobs); + std::vector blobids(blobs_arr, blobs_arr + blobs->length); + ReleaseIdsPtr(mdm); + EndTicketMutex(&mdm->vbucket_mutex); + return blobids; +} + +void LocalRemoveBlobFromVBucketInfo(SharedMemoryContext *context, + VBucketID vbucket_id, BlobID blob_id) { + MetadataManager *mdm = GetMetadataManagerFromContext(context); + BeginTicketMutex(&mdm->vbucket_mutex); + VBucketInfo *info = LocalGetVBucketInfoById(mdm, vbucket_id); + ChunkedIdList *blobs = &info->blobs; + BlobID *blobs_arr = (BlobID *)GetIdsPtr(mdm, *blobs); + for (u32 i = 0; i < blobs->length; ++i) { + if (blobs_arr[i].as_int == blob_id.as_int) { + blobs_arr[i] = blobs_arr[--blobs->length]; + break; + } + } + ReleaseIdsPtr(mdm); + EndTicketMutex(&mdm->vbucket_mutex); +} + } // namespace hermes diff --git a/src/rpc_thallium.cc b/src/rpc_thallium.cc index 7c08b7e65..900bb139f 100644 --- a/src/rpc_thallium.cc +++ b/src/rpc_thallium.cc @@ -337,6 +337,22 @@ void ThalliumStartRpcServer(SharedMemoryContext *context, RpcContext *rpc, state->engine->finalize(); }; + auto rpc_remove_blob_from_vbucket_info = [context](const request &req, + VBucketID vbucket_id, + BlobID blob_id) { + LocalRemoveBlobFromVBucketInfo(context, vbucket_id, blob_id); + req.respond(true); + }; + auto rpc_get_blobs_from_vbucket_info = [context](const request &req, + VBucketID vbucket_id) { + auto ret = LocalGetBlobsFromVBucketInfo(context, vbucket_id); + req.respond(ret); + }; + auto rpc_get_bucket_name_by_id = [context](const request &req, BucketID id) { + auto ret = LocalGetBucketNameById(context, id); + req.respond(ret); + }; + // TODO(chogan): Currently these three are only used for testing. rpc_server->define("GetBuffers", rpc_get_buffers); rpc_server->define("SplitBuffers", rpc_split_buffers).disable_response(); @@ -383,6 +399,12 @@ void ThalliumStartRpcServer(SharedMemoryContext *context, RpcContext *rpc, rpc_get_bucket_id_from_blob_id); rpc_server->define("RemoteGetBlobNameFromId", rpc_get_blob_name_from_id); rpc_server->define("RemoteFinalize", rpc_finalize).disable_response(); + rpc_server->define("RemoteRemoveBlobFromVBucketInfo", + rpc_remove_blob_from_vbucket_info); + rpc_server->define("RemoteGetBlobsFromVBucketInfo", + rpc_get_blobs_from_vbucket_info); + rpc_server->define("RemoteGetBucketNameById", + rpc_get_bucket_name_by_id); } void StartBufferOrganizer(SharedMemoryContext *context, RpcContext *rpc, diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 878c3d9ef..8a1040f2e 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -12,7 +12,7 @@ include_directories( # API tests #------------------------------------------------------------------------------ set(API_TESTS dpe_optimization_test dpe_random_test - dpe_roundrobin_test end_to_end_test) + dpe_roundrobin_test end_to_end_test vbucket_test) foreach(program ${API_TESTS}) add_executable(${program} ${program}.cc) diff --git a/test/vbucket_test.cc b/test/vbucket_test.cc new file mode 100644 index 000000000..4d631f6d7 --- /dev/null +++ b/test/vbucket_test.cc @@ -0,0 +1,105 @@ +/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * + * Distributed under BSD 3-Clause license. * + * Copyright by The HDF Group. * + * Copyright by the Illinois Institute of Technology. * + * All rights reserved. * + * * + * This file is part of Hermes. The full Hermes copyright notice, including * + * terms governing use, modification, and redistribution, is contained in * + * the COPYING file, which can be found at the top directory. If you do not * + * have access to the file, you may request a copy from help@hdfgroup.org. * + * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ + +#include +#include +#include + +#include "bucket.h" +#include "hermes.h" +#include "test_utils.h" + +namespace hapi = hermes::api; + +int main(int argc, char **argv) { + int mpi_threads_provided; + MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &mpi_threads_provided); + if (mpi_threads_provided < MPI_THREAD_MULTIPLE) { + fprintf(stderr, "Didn't receive appropriate MPI threading specification\n"); + return 1; + } + + char *config_file = 0; + if (argc == 2) { + config_file = argv[1]; + } + + std::shared_ptr hermes = hapi::InitHermes(config_file); + + int num_blobs_per_rank = 16; + int data_size = 8 * 1024; + hapi::Blob put_data(data_size, rand() % 255); + hapi::Blob get_data(data_size, 255); + if (hermes->IsApplicationCore()) { + int app_rank = hermes->GetProcessRank(); + int app_size = hermes->GetNumProcesses(); + + hapi::Context ctx; + + // Each rank puts and gets its portion of a blob to a shared bucket + std::string bucket_name = "test_bucket" + std::to_string(app_rank); + hapi::Bucket rank_bucket(bucket_name, hermes, ctx); + for (int i = 0; i < num_blobs_per_rank; ++i) { + std::string blob_name = + "Blob_" + std::to_string(app_rank) + "_" + std::to_string(i); + rank_bucket.Put(blob_name, put_data, ctx); + } + hapi::VBucket shared("shared_vbucket", hermes, false, ctx); + for (int i = 0; i < num_blobs_per_rank; ++i) { + std::string blob_name = + "Blob_" + std::to_string(app_rank) + "_" + std::to_string(i); + shared.Link(blob_name, bucket_name, ctx); + } + hermes->AppBarrier(); + if (app_rank == 0) { + auto blob_ids = shared.GetLinks(ctx); + Assert(blob_ids.size() == (unsigned long)app_size * num_blobs_per_rank); + for (int rank = 0; rank < app_size; ++rank) { + std::string bucket_name_temp = "test_bucket" + std::to_string(rank); + for (int i = 0; i < num_blobs_per_rank; ++i) { + std::string blob_name = + "Blob_" + std::to_string(rank) + "_" + std::to_string(i); + auto exists = shared.ContainsBlob(blob_name, bucket_name_temp); + Assert(exists); + } + } + } + hermes->AppBarrier(); + rank_bucket.Destroy(ctx); + hermes->AppBarrier(); + if (app_rank == 0) { + auto blob_ids = shared.GetLinks(ctx); + Assert(blob_ids.size() == (unsigned long)app_size * num_blobs_per_rank); + for (int rank = 0; rank < app_size; ++rank) { + std::string bucket_name_temp = "test_bucket" + std::to_string(rank); + for (int i = 0; i < num_blobs_per_rank; ++i) { + std::string blob_name = + "Blob_" + std::to_string(rank) + "_" + std::to_string(i); + auto exists = shared.ContainsBlob(blob_name, bucket_name_temp); + Assert(!exists); + } + } + } + if (app_rank == 0) { + shared.Delete(ctx); + } + hermes->AppBarrier(); + } else { + // Hermes core. No user code here. + } + + hermes->Finalize(); + + MPI_Finalize(); + + return 0; +}