diff --git a/adapter/include/hermes/adapter/mpiio.h b/adapter/include/hermes/adapter/mpiio.h index 705e5cf83..97755f644 100644 --- a/adapter/include/hermes/adapter/mpiio.h +++ b/adapter/include/hermes/adapter/mpiio.h @@ -33,12 +33,14 @@ /** * Internal headers */ -#include #include +#include +#include + +#include +#include #include #include -#include -#include #include #include diff --git a/adapter/include/hermes/adapter/posix.h b/adapter/include/hermes/adapter/posix.h index 4dfdcfc39..f5d8d4b94 100644 --- a/adapter/include/hermes/adapter/posix.h +++ b/adapter/include/hermes/adapter/posix.h @@ -28,15 +28,17 @@ /** * Internal headers */ -#include #include +#include +#include + +#include #include +#include #include #include #include #include -#include -#include /** * Function declarations diff --git a/adapter/include/hermes/adapter/stdio.h b/adapter/include/hermes/adapter/stdio.h index 1e5210e60..4f5021d43 100644 --- a/adapter/include/hermes/adapter/stdio.h +++ b/adapter/include/hermes/adapter/stdio.h @@ -33,6 +33,7 @@ */ #include #include +#include #include #include #include diff --git a/adapter/src/hermes/adapter/constants.h b/adapter/src/hermes/adapter/constants.h index 83ed19335..2c8ee8b35 100644 --- a/adapter/src/hermes/adapter/constants.h +++ b/adapter/src/hermes/adapter/constants.h @@ -13,6 +13,8 @@ #ifndef HERMES_ADAPTER_CONSTANTS_H #define HERMES_ADAPTER_CONSTANTS_H +#include + /** * Constants file for all adapter. * This file contains common constants across different adapters. @@ -88,4 +90,40 @@ const char* kAdapterScratchMode = "SCRATCH"; * Default value: \c 1 */ const char* kStopDaemon = "HERMES_STOP_DAEMON"; + +/** + * The page size in bytes when mapping files to Hermes Blobs. Hermes will be + * more efficient if you set this number to the most common size of writes in + * your application. This is set via HERMES_PAGE_SIZE. + * + * Example: With a page size of 1MiB, a 100 MiB file will be mapped to 100, 1MiB + * Blobs. In this scenario, doing several smaller writes within the same 1MiB + * region will cause all of those writes to transfer 1MiB of data, which is why + * it's important to align the page size to your workload. + */ +const size_t kPageSize = []() { + const char *kPageSizeVar = "HERMES_PAGE_SIZE"; + const size_t kDefaultPageSize = 1 * 1024 * 1024; + + size_t result = kDefaultPageSize; + char *page_size = getenv(kPageSizeVar); + + if (page_size) { + result = (size_t)std::strtoull(page_size, NULL, 0); + if (result == 0) { + LOG(FATAL) << "Invalid value of " << kPageSizeVar << ": " << page_size; + } + } + + LOG(INFO) << "Adapter page size: " << result << "\n"; + + return result; +}(); + +/** + * Set this environment variable to '1' for more efficient performance on + * workloads that are write-only. + */ +const char* kHermesWriteOnlyVar = "HERMES_WRITE_ONLY"; + #endif // HERMES_ADAPTER_CONSTANTS_H diff --git a/adapter/src/hermes/adapter/interceptor.h b/adapter/src/hermes/adapter/interceptor.h index 678c034d9..221e357ac 100644 --- a/adapter/src/hermes/adapter/interceptor.h +++ b/adapter/src/hermes/adapter/interceptor.h @@ -24,7 +24,6 @@ #include #include -#include #include #include diff --git a/adapter/src/hermes/adapter/mpiio/CMakeLists.txt b/adapter/src/hermes/adapter/mpiio/CMakeLists.txt index 5b28cad54..6efe010ee 100644 --- a/adapter/src/hermes/adapter/mpiio/CMakeLists.txt +++ b/adapter/src/hermes/adapter/mpiio/CMakeLists.txt @@ -17,18 +17,11 @@ set(MPIIO_ADAPTER_PRIVATE_HEADER ${CMAKE_CURRENT_SOURCE_DIR}/metadata_manager.h ${CMAKE_CURRENT_SOURCE_DIR}/common/enumerations.h ${CMAKE_CURRENT_SOURCE_DIR}/common/constants.h) -set(MPIIO_INTERNAL_ADAPTER_SRC ${CMAKE_CURRENT_SOURCE_DIR}/metadata_manager.cc - ${CMAKE_CURRENT_SOURCE_DIR}/mapper/balanced_mapper.cc) - # Add library hermes_mpiio add_library(hermes_mpiio SHARED ${MPIIO_ADAPTER_PRIVATE_HEADER} ${MPIIO_ADAPTER_PUBLIC_HEADER} ${MPIIO_ADAPTER_SRC}) add_dependencies(hermes_mpiio hermes) target_link_libraries(hermes_mpiio hermes MPI::MPI_CXX) -add_library(hermes_mpiio_internal SHARED ${MPIIO_ADAPTER_PRIVATE_HEADER} ${MPIIO_ADAPTER_PUBLIC_HEADER} ${MPIIO_INTERNAL_ADAPTER_SRC}) -add_dependencies(hermes_mpiio_internal hermes) -target_link_libraries(hermes_mpiio_internal hermes MPI::MPI_CXX) - #----------------------------------------------------------------------------- # Add Target(s) to CMake Install #----------------------------------------------------------------------------- @@ -46,4 +39,4 @@ install( #----------------------------------------------------------------------------- if(HERMES_ENABLE_COVERAGE) set_coverage_flags(hermes_mpiio) -endif() \ No newline at end of file +endif() diff --git a/adapter/src/hermes/adapter/mpiio/common/constants.h b/adapter/src/hermes/adapter/mpiio/common/constants.h index cf85363df..a2d71412b 100644 --- a/adapter/src/hermes/adapter/mpiio/common/constants.h +++ b/adapter/src/hermes/adapter/mpiio/common/constants.h @@ -35,10 +35,7 @@ using hermes::adapter::mpiio::MapperType; * Which mapper to be used by MPIIO adapter. */ const MapperType kMapperType = MapperType::BALANCED; -/** - * Define kPageSize for balanced mapping. - */ -const size_t kPageSize = 1024 * 1024; + /** * String delimiter */ diff --git a/adapter/src/hermes/adapter/posix/CMakeLists.txt b/adapter/src/hermes/adapter/posix/CMakeLists.txt index 90aba5959..f64da4c04 100644 --- a/adapter/src/hermes/adapter/posix/CMakeLists.txt +++ b/adapter/src/hermes/adapter/posix/CMakeLists.txt @@ -17,18 +17,11 @@ set(POSIX_ADAPTER_PRIVATE_HEADER ${CMAKE_CURRENT_SOURCE_DIR}/metadata_manager.h ${CMAKE_CURRENT_SOURCE_DIR}/common/enumerations.h ${CMAKE_CURRENT_SOURCE_DIR}/common/constants.h) -set(POSIX_INTERNAL_ADAPTER_SRC ${CMAKE_CURRENT_SOURCE_DIR}/metadata_manager.cc - ${CMAKE_CURRENT_SOURCE_DIR}/mapper/balanced_mapper.cc) - # Add library hermes_posix add_library(hermes_posix SHARED ${POSIX_ADAPTER_PRIVATE_HEADER} ${POSIX_ADAPTER_PUBLIC_HEADER} ${POSIX_ADAPTER_SRC}) add_dependencies(hermes_posix hermes) target_link_libraries(hermes_posix hermes MPI::MPI_CXX) -add_library(hermes_posix_internal SHARED ${POSIX_ADAPTER_PRIVATE_HEADER} ${POSIX_ADAPTER_PUBLIC_HEADER} ${POSIX_INTERNAL_ADAPTER_SRC}) -add_dependencies(hermes_posix_internal hermes) -target_link_libraries(hermes_posix_internal hermes MPI::MPI_CXX) - #----------------------------------------------------------------------------- # Add Target(s) to CMake Install #----------------------------------------------------------------------------- @@ -46,4 +39,4 @@ install( #----------------------------------------------------------------------------- if(HERMES_ENABLE_COVERAGE) set_coverage_flags(hermes_posix) -endif() \ No newline at end of file +endif() diff --git a/adapter/src/hermes/adapter/posix/common/constants.h b/adapter/src/hermes/adapter/posix/common/constants.h index 4f85f76b1..21130b853 100644 --- a/adapter/src/hermes/adapter/posix/common/constants.h +++ b/adapter/src/hermes/adapter/posix/common/constants.h @@ -35,10 +35,7 @@ using hermes::adapter::posix::MapperType; * Which mapper to be used by POSIX adapter. */ const MapperType kMapperType = MapperType::BALANCED; -/** - * Define kPageSize for balanced mapping. - */ -const size_t kPageSize = 1024 * 1024; + /** * String delimiter */ diff --git a/adapter/src/hermes/adapter/posix/posix.cc b/adapter/src/hermes/adapter/posix/posix.cc index 3010e2994..3f1bcaa4d 100644 --- a/adapter/src/hermes/adapter/posix/posix.cc +++ b/adapter/src/hermes/adapter/posix/posix.cc @@ -13,7 +13,9 @@ #include #include + #include +#include #include #include @@ -24,11 +26,14 @@ using hermes::adapter::posix::MetadataManager; namespace hapi = hermes::api; namespace fs = std::experimental::filesystem; + +using hermes::u8; + /** * Internal Functions */ -size_t perform_file_write(std::string &filename, off_t offset, size_t size, - unsigned char *data_ptr) { +size_t perform_file_write(const std::string &filename, off_t offset, + size_t size, u8 *data_ptr) { LOG(INFO) << "Writing to file: " << filename << " offset: " << offset << " of size:" << size << "." << std::endl; INTERCEPTOR_LIST->hermes_flush_exclusion.insert(filename); @@ -130,199 +135,152 @@ int open_internal(const std::string &path_str, int flags, int mode) { return ret; } -size_t write_internal(std::pair &existing, const void *ptr, - size_t total_size, int fp) { - LOG(INFO) << "Write called for filename: " - << existing.first.st_bkid->GetName() - << " on offset: " << existing.first.st_ptr - << " and size: " << total_size << std::endl; - size_t ret; +void PutWithPosixFallback(AdapterStat &stat, const std::string &blob_name, + const std::string &filename, u8 *data, size_t size, + size_t offset) { + hapi::Context ctx; + const char *hermes_write_only = getenv(kHermesWriteOnlyVar); + + if (hermes_write_only && hermes_write_only[0] == '1') { + // Custom DPE for write-only apps like VPIC + ctx.rr_retry = true; + ctx.disable_swap = true; + } + + hapi::Status status = stat.st_bkid->Put(blob_name, data, size, ctx); + if (status.Failed()) { + LOG(WARNING) << "Failed to Put Blob " << blob_name << " to Bucket " + << filename << ". Falling back to posix I/O." << std::endl; + perform_file_write(filename, offset, size, data); + } else { + stat.st_blobs.emplace(blob_name); + } +} + +size_t write_internal(AdapterStat &stat, const void *ptr, size_t total_size, + int fp) { + std::shared_ptr bkt = stat.st_bkid; + std::string filename = bkt->GetName(); + + LOG(INFO) << "Write called for filename: " << filename << " on offset: " + << stat.st_ptr << " and size: " << total_size << std::endl; + + size_t ret = 0; auto mdm = hermes::adapter::Singleton::GetInstance(); auto mapper = MapperFactory().Get(kMapperType); auto mapping = mapper->map( - FileStruct(mdm->Convert(fp), existing.first.st_ptr, total_size)); + FileStruct(mdm->Convert(fp), stat.st_ptr, total_size)); size_t data_offset = 0; - auto filename = existing.first.st_bkid->GetName(); - LOG(INFO) << "Mapping for write has " << mapping.size() << " mapping." - << std::endl; - for (const auto &item : mapping) { - hapi::Context ctx; - auto index = std::stol(item.second.blob_name_) - 1; - auto blob_exists = - existing.first.st_bkid->ContainsBlob(item.second.blob_name_); - unsigned char *put_data_ptr = (unsigned char *)ptr + data_offset; - size_t put_data_ptr_size = item.first.size_; - - if (!blob_exists || item.second.size_ == kPageSize) { - LOG(INFO) << "Create or Overwrite blob " << item.second.blob_name_ - << " of size:" << item.second.size_ << "." << std::endl; - if (item.second.size_ == kPageSize) { - auto status = existing.first.st_bkid->Put( - item.second.blob_name_, put_data_ptr, put_data_ptr_size, ctx); - if (status.Failed()) { - perform_file_write(filename, item.first.offset_, put_data_ptr_size, - put_data_ptr); - } else { - existing.first.st_blobs.emplace(item.second.blob_name_); - } - } else if (item.second.offset_ == 0) { - auto status = existing.first.st_bkid->Put( - item.second.blob_name_, put_data_ptr, put_data_ptr_size, ctx); - if (status.Failed()) { - perform_file_write(filename, index * kPageSize, put_data_ptr_size, - put_data_ptr); - } else { - existing.first.st_blobs.emplace(item.second.blob_name_); - } + LOG(INFO) << "Mapping for write has " << mapping.size() << " mappings.\n"; + + for (const auto &[finfo, hinfo] : mapping) { + auto index = std::stol(hinfo.blob_name_) - 1; + size_t offset = index * kPageSize; + auto blob_exists = bkt->ContainsBlob(hinfo.blob_name_); + u8 *put_data_ptr = (u8 *)ptr + data_offset; + size_t put_data_ptr_size = finfo.size_; + + if (!blob_exists || hinfo.size_ == kPageSize) { + LOG(INFO) << "Create or Overwrite blob " << hinfo.blob_name_ + << " of size:" << hinfo.size_ << "." << std::endl; + if (hinfo.size_ == kPageSize) { + PutWithPosixFallback(stat, hinfo.blob_name_, filename, put_data_ptr, + put_data_ptr_size, finfo.offset_); + } else if (hinfo.offset_ == 0) { + PutWithPosixFallback(stat, hinfo.blob_name_, filename, put_data_ptr, + put_data_ptr_size, offset); } else { - hapi::Blob final_data(item.second.offset_ + item.second.size_); - if (fs::exists(filename) && - fs::file_size(filename) >= item.second.offset_) { - LOG(INFO) << "Blob has a gap in write. read gap from original file." - << std::endl; - INTERCEPTOR_LIST->hermes_flush_exclusion.insert(filename); - FILE *fh = fopen(filename.c_str(), "r"); - if (fh != nullptr) { - if (fseek(fh, index * kPageSize, SEEK_SET) == 0) { - size_t items_read = fread(final_data.data(), item.second.offset_, - sizeof(char), fh); - if (items_read != 1) { - // TODO(hari) @errorhandling read failed. - } - if (fclose(fh) != 0) { - // TODO(hari) @errorhandling fclose failed. - } - } else { - // TODO(hari) @errorhandling fseek failed. - } - } else { - // TODO(hari) @errorhandling FILE cannot be opened - } - INTERCEPTOR_LIST->hermes_flush_exclusion.erase(filename); - } - memcpy(final_data.data() + item.second.offset_, put_data_ptr, + hapi::Blob final_data(hinfo.offset_ + hinfo.size_); + + ReadGap(filename, offset, final_data.data(), hinfo.offset_, + hinfo.offset_); + memcpy(final_data.data() + hinfo.offset_, put_data_ptr, put_data_ptr_size); - auto status = existing.first.st_bkid->Put(item.second.blob_name_, - final_data, ctx); - if (status.Failed()) { - perform_file_write(filename, index * kPageSize, final_data.size(), - final_data.data()); - } else { - existing.first.st_blobs.emplace(item.second.blob_name_); - } + PutWithPosixFallback(stat, hinfo.blob_name_, filename, + final_data.data(), final_data.size(), offset); } - } else { - LOG(INFO) << "Blob " << item.second.blob_name_ - << " of size:" << item.second.size_ << " exists." << std::endl; + LOG(INFO) << "Blob " << hinfo.blob_name_ + << " of size:" << hinfo.size_ << " exists." << std::endl; hapi::Blob temp(0); - auto existing_blob_size = - existing.first.st_bkid->Get(item.second.blob_name_, temp, ctx); - if (item.second.offset_ == 0) { + auto existing_blob_size = bkt->Get(hinfo.blob_name_, temp); + if (hinfo.offset_ == 0) { LOG(INFO) << "Blob offset is 0" << std::endl; - if (item.second.size_ >= existing_blob_size) { - LOG(INFO) << "Overwrite blob " << item.second.blob_name_ - << " of size:" << item.second.size_ << "." << std::endl; - auto status = existing.first.st_bkid->Put( - item.second.blob_name_, put_data_ptr, put_data_ptr_size, ctx); - if (status.Failed()) { - perform_file_write(filename, index * kPageSize, put_data_ptr_size, - put_data_ptr); - } else { - existing.first.st_blobs.emplace(item.second.blob_name_); - } + if (hinfo.size_ >= existing_blob_size) { + LOG(INFO) << "Overwrite blob " << hinfo.blob_name_ + << " of size:" << hinfo.size_ << "." << std::endl; + PutWithPosixFallback(stat, hinfo.blob_name_, filename, + put_data_ptr, put_data_ptr_size, offset); } else { - LOG(INFO) << "Update blob " << item.second.blob_name_ + LOG(INFO) << "Update blob " << hinfo.blob_name_ << " of size:" << existing_blob_size << "." << std::endl; hapi::Blob existing_data(existing_blob_size); - existing.first.st_bkid->Get(item.second.blob_name_, existing_data, - ctx); + bkt->Get(hinfo.blob_name_, existing_data); memcpy(existing_data.data(), put_data_ptr, put_data_ptr_size); - auto status = existing.first.st_bkid->Put(item.second.blob_name_, - existing_data, ctx); - if (status.Failed()) { - perform_file_write(filename, index * kPageSize, existing_blob_size, - existing_data.data()); - } else { - existing.first.st_blobs.emplace(item.second.blob_name_); - } + + PutWithPosixFallback(stat, hinfo.blob_name_, filename, + existing_data.data(), existing_data.size(), + offset); } } else { - LOG(INFO) << "Blob offset: " << item.second.offset_ << "." << std::endl; - auto new_size = item.second.offset_ + item.second.size_; + LOG(INFO) << "Blob offset: " << hinfo.offset_ << "." << std::endl; + auto new_size = hinfo.offset_ + hinfo.size_; hapi::Blob existing_data(existing_blob_size); - existing.first.st_bkid->Get(item.second.blob_name_, existing_data, ctx); - existing.first.st_bkid->DeleteBlob(item.second.blob_name_, ctx); + bkt->Get(hinfo.blob_name_, existing_data); + bkt->DeleteBlob(hinfo.blob_name_); if (new_size < existing_blob_size) { new_size = existing_blob_size; } hapi::Blob final_data(new_size); - auto existing_data_cp_size = existing_data.size() >= item.second.offset_ - ? item.second.offset_ - : existing_data.size(); + auto existing_data_cp_size = existing_data.size() >= hinfo.offset_ + ? hinfo.offset_ : existing_data.size(); memcpy(final_data.data(), existing_data.data(), existing_data_cp_size); - if (existing_blob_size < item.second.offset_ + 1 && - fs::exists(filename) && - fs::file_size(filename) >= - item.second.offset_ + item.second.size_) { - size_t size_to_read = item.second.offset_ - existing_blob_size; - LOG(INFO) << "Blob has a gap in update read gap from original file." - << std::endl; - INTERCEPTOR_LIST->hermes_flush_exclusion.insert(filename); - FILE *fh = fopen(filename.c_str(), "r"); - if (fh != nullptr) { - if (fseek(fh, index * kPageSize + existing_data_cp_size, - SEEK_SET) == 0) { - size_t items_read = - fread(final_data.data() + existing_data_cp_size, size_to_read, - sizeof(char), fh); - if (items_read != 1) { - // TODO(hari) @errorhandling read failed. - } - if (fclose(fh) != 0) { - // TODO(hari) @errorhandling fclose failed. - } - } else { - // TODO(hari) @errorhandling fseek failed. - } - } else { - // TODO(hari) @errorhandling FILE cannot be opened - } - INTERCEPTOR_LIST->hermes_flush_exclusion.erase(filename); + if (existing_blob_size < hinfo.offset_ + 1) { + ReadGap(filename, offset + existing_data_cp_size, + final_data.data() + existing_data_cp_size, + hinfo.offset_ - existing_blob_size, + hinfo.offset_ + hinfo.size_); } - memcpy(final_data.data() + item.second.offset_, put_data_ptr, + memcpy(final_data.data() + hinfo.offset_, put_data_ptr, put_data_ptr_size); - if (item.second.offset_ + item.second.size_ < existing_blob_size) { + + if (hinfo.offset_ + hinfo.size_ < existing_blob_size) { LOG(INFO) << "Retain last portion of blob as Blob is bigger than the " - "update." - << std::endl; - auto off_t = item.second.offset_ + item.second.size_; + "update." << std::endl; + auto off_t = hinfo.offset_ + hinfo.size_; memcpy(final_data.data() + off_t, existing_data.data() + off_t, existing_blob_size - off_t); } - auto status = existing.first.st_bkid->Put(item.second.blob_name_, - final_data, ctx); - if (status.Failed()) { - perform_file_write(filename, index * kPageSize, new_size, - final_data.data()); - } else { - existing.first.st_blobs.emplace(item.second.blob_name_); - } + + PutWithPosixFallback(stat, hinfo.blob_name_, filename, + final_data.data(), final_data.size(), offset); } } - data_offset += item.first.size_; + data_offset += finfo.size_; + + // TODO(chogan): + // if (PersistEagerly(filename)) { + // hapi::Trait *trait = stat.st_vbkt->GetTrait(hapi::TraitType::PERSIST); + // if (trait) { + // hapi::PersistTrait *persist_trait = (hapi::PersistTrait *)trait; + // persist_trait->file_mapping.offset_map.emplace(hinfo.blob_name_, + // offset); + // } + + // stat.st_vbkt->Link(hinfo.blob_name_, filename); + // } } - existing.first.st_ptr += data_offset; - existing.first.st_size = existing.first.st_size >= existing.first.st_ptr - ? existing.first.st_size - : existing.first.st_ptr; + stat.st_ptr += data_offset; + stat.st_size = stat.st_size >= stat.st_ptr ? stat.st_size : stat.st_ptr; + struct timespec ts; timespec_get(&ts, TIME_UTC); - existing.first.st_mtim = ts; - existing.first.st_ctim = ts; - mdm->Update(fp, existing.first); + stat.st_mtim = ts; + stat.st_ctim = ts; + mdm->Update(fp, stat); ret = data_offset; + return ret; } @@ -572,7 +530,7 @@ ssize_t HERMES_DECL(write)(int fd, const void *buf, size_t count) { auto existing = mdm->Find(fd); if (existing.second) { LOG(INFO) << "Intercept write." << std::endl; - ret = write_internal(existing, buf, count, fd); + ret = write_internal(existing.first, buf, count, fd); } else { MAP_OR_FAIL(write); ret = real_write_(fd, buf, count); @@ -614,7 +572,7 @@ ssize_t HERMES_DECL(pwrite)(int fd, const void *buf, size_t count, LOG(INFO) << "Intercept pwrite." << std::endl; int status = lseek(fd, offset, SEEK_SET); if (status == 0) { - ret = write_internal(existing, buf, count, fd); + ret = write_internal(existing.first, buf, count, fd); } } else { MAP_OR_FAIL(pwrite); @@ -657,7 +615,7 @@ ssize_t HERMES_DECL(pwrite64)(int fd, const void *buf, size_t count, LOG(INFO) << "Intercept pwrite." << std::endl; int status = lseek(fd, offset, SEEK_SET); if (status == 0) { - ret = write_internal(existing, buf, count, fd); + ret = write_internal(existing.first, buf, count, fd); } } else { MAP_OR_FAIL(pwrite); diff --git a/adapter/src/hermes/adapter/stdio/CMakeLists.txt b/adapter/src/hermes/adapter/stdio/CMakeLists.txt index 9bd79cbe9..b1ae5151f 100644 --- a/adapter/src/hermes/adapter/stdio/CMakeLists.txt +++ b/adapter/src/hermes/adapter/stdio/CMakeLists.txt @@ -17,18 +17,11 @@ set(STDIO_ADAPTER_PRIVATE_HEADER ${CMAKE_CURRENT_SOURCE_DIR}/metadata_manager.h ${CMAKE_CURRENT_SOURCE_DIR}/common/enumerations.h ${CMAKE_CURRENT_SOURCE_DIR}/common/constants.h) -set(STDIO_INTERNAL_ADAPTER_SRC ${CMAKE_CURRENT_SOURCE_DIR}/metadata_manager.cc - ${CMAKE_CURRENT_SOURCE_DIR}/mapper/balanced_mapper.cc) - # Add library hermes_stdio add_library(hermes_stdio SHARED ${STDIO_ADAPTER_PRIVATE_HEADER} ${STDIO_ADAPTER_PUBLIC_HEADER} ${STDIO_ADAPTER_SRC}) add_dependencies(hermes_stdio hermes) target_link_libraries(hermes_stdio hermes MPI::MPI_CXX) -add_library(hermes_stdio_internal SHARED ${STDIO_ADAPTER_PRIVATE_HEADER} ${STDIO_ADAPTER_PUBLIC_HEADER} ${STDIO_INTERNAL_ADAPTER_SRC}) -add_dependencies(hermes_stdio_internal hermes) -target_link_libraries(hermes_stdio_internal hermes MPI::MPI_CXX) - #----------------------------------------------------------------------------- # Add Target(s) to CMake Install #----------------------------------------------------------------------------- diff --git a/adapter/src/hermes/adapter/stdio/common/constants.h b/adapter/src/hermes/adapter/stdio/common/constants.h index ceba40aa5..320236a9a 100644 --- a/adapter/src/hermes/adapter/stdio/common/constants.h +++ b/adapter/src/hermes/adapter/stdio/common/constants.h @@ -13,19 +13,8 @@ #ifndef HERMES_STDIO_COMMON_CONSTANTS_H #define HERMES_STDIO_COMMON_CONSTANTS_H -/** - * Standard header - */ #include -/** - * Dependent library header - */ -#include "glog/logging.h" - -/** - * Internal header - */ #include /** @@ -38,28 +27,6 @@ using hermes::adapter::stdio::MapperType; */ const MapperType kMapperType = MapperType::BALANCED; -/** - * Define kPageSize for balanced mapping. - */ -const size_t kPageSize = []() { - const char *kPageSizeVar = "HERMES_PAGE_SIZE"; - const size_t kDefaultPageSize = 1 * 1024 * 1024; - - size_t result = kDefaultPageSize; - char *page_size = getenv(kPageSizeVar); - - if (page_size) { - result = (size_t)std::strtoull(page_size, NULL, 0); - if (result == 0) { - LOG(FATAL) << "Invalid value of " << kPageSizeVar << ": " << page_size; - } - } - - LOG(INFO) << "Stdio adapter page size: " << result << "\n"; - - return result; -}(); - /** * String delimiter */ diff --git a/adapter/src/hermes/adapter/stdio/stdio.cc b/adapter/src/hermes/adapter/stdio/stdio.cc index 197b1ec0d..d0fbd192b 100644 --- a/adapter/src/hermes/adapter/stdio/stdio.cc +++ b/adapter/src/hermes/adapter/stdio/stdio.cc @@ -16,6 +16,7 @@ #include #include +#include #include #include @@ -214,42 +215,11 @@ FILE *reopen_internal(const std::string &path_str, const char *mode, return ret; } -void ReadGap(const std::string &filename, size_t seek_offset, u8 *read_ptr, - size_t read_size, size_t file_bounds) { - if (fs::exists(filename) && - fs::file_size(filename) >= file_bounds) { - LOG(INFO) << "Blob has a gap in write. Read gap from original file.\n"; - INTERCEPTOR_LIST->hermes_flush_exclusion.insert(filename); - int fd = open(filename.c_str(), O_RDONLY); - if (fd) { - if (flock(fd, LOCK_SH) == -1) { - hermes::FailedLibraryCall("flock"); - } - - ssize_t bytes_read = pread(fd, read_ptr, read_size, seek_offset); - if (bytes_read == -1 || (size_t)bytes_read != read_size) { - hermes::FailedLibraryCall("pread"); - } - - if (flock(fd, LOCK_UN) == -1) { - hermes::FailedLibraryCall("flock"); - } - - if (close(fd) != 0) { - hermes::FailedLibraryCall("close"); - } - } else { - hermes::FailedLibraryCall("open"); - } - INTERCEPTOR_LIST->hermes_flush_exclusion.erase(filename); - } -} - void PutWithStdioFallback(AdapterStat &stat, const std::string &blob_name, const std::string &filename, u8 *data, size_t size, size_t offset) { hapi::Context ctx; - const char *hermes_write_only = getenv("HERMES_WRITE_ONLY"); + const char *hermes_write_only = getenv(kHermesWriteOnlyVar); if (hermes_write_only && hermes_write_only[0] == '1') { // Custom DPE for write-only apps like VPIC @@ -274,15 +244,16 @@ size_t write_internal(AdapterStat &stat, const void *ptr, size_t total_size, LOG(INFO) << "Write called for filename: " << filename << " on offset: " << stat.st_ptr << " and size: " << total_size << std::endl; - size_t ret; + + size_t ret = 0; auto mdm = hermes::adapter::Singleton::GetInstance(); auto mapper = MapperFactory().Get(kMapperType); auto mapping = mapper->map( FileStruct(mdm->Convert(fp), stat.st_ptr, total_size)); size_t data_offset = 0; - LOG(INFO) << "Mapping for write has " << mapping.size() << " mappings." - << std::endl; - for (const auto& [finfo, hinfo] : mapping) { + LOG(INFO) << "Mapping for write has " << mapping.size() << " mappings.\n"; + + for (const auto &[finfo, hinfo] : mapping) { auto index = std::stol(hinfo.blob_name_) - 1; size_t offset = index * kPageSize; auto blob_exists = bkt->ContainsBlob(hinfo.blob_name_); @@ -293,17 +264,16 @@ size_t write_internal(AdapterStat &stat, const void *ptr, size_t total_size, LOG(INFO) << "Create or Overwrite blob " << hinfo.blob_name_ << " of size:" << hinfo.size_ << "." << std::endl; if (hinfo.size_ == kPageSize) { - PutWithStdioFallback(stat, hinfo.blob_name_, filename, - put_data_ptr, put_data_ptr_size, - finfo.offset_); + PutWithStdioFallback(stat, hinfo.blob_name_, filename, put_data_ptr, + put_data_ptr_size, finfo.offset_); } else if (hinfo.offset_ == 0) { - PutWithStdioFallback(stat, hinfo.blob_name_, filename, - put_data_ptr, put_data_ptr_size, offset); + PutWithStdioFallback(stat, hinfo.blob_name_, filename, put_data_ptr, + put_data_ptr_size, offset); } else { hapi::Blob final_data(hinfo.offset_ + hinfo.size_); - ReadGap(filename, index * kPageSize, final_data.data(), - hinfo.offset_, hinfo.offset_); + ReadGap(filename, offset, final_data.data(), hinfo.offset_, + hinfo.offset_); memcpy(final_data.data() + hinfo.offset_, put_data_ptr, put_data_ptr_size); PutWithStdioFallback(stat, hinfo.blob_name_, filename, @@ -346,8 +316,8 @@ size_t write_internal(AdapterStat &stat, const void *ptr, size_t total_size, ? hinfo.offset_ : existing_data.size(); memcpy(final_data.data(), existing_data.data(), existing_data_cp_size); - if (existing_blob_size < hinfo.offset_) { - ReadGap(filename, index * kPageSize + existing_data_cp_size, + if (existing_blob_size < hinfo.offset_ + 1) { + ReadGap(filename, offset + existing_data_cp_size, final_data.data() + existing_data_cp_size, hinfo.offset_ - existing_blob_size, hinfo.offset_ + hinfo.size_); @@ -362,6 +332,7 @@ size_t write_internal(AdapterStat &stat, const void *ptr, size_t total_size, memcpy(final_data.data() + off_t, existing_data.data() + off_t, existing_blob_size - off_t); } + PutWithStdioFallback(stat, hinfo.blob_name_, filename, final_data.data(), final_data.size(), offset); } diff --git a/adapter/src/hermes/adapter/utils.cc b/adapter/src/hermes/adapter/utils.cc new file mode 100644 index 000000000..fc79da1d3 --- /dev/null +++ b/adapter/src/hermes/adapter/utils.cc @@ -0,0 +1,35 @@ +#include + +using hermes::u8; +namespace fs = std::experimental::filesystem; + +void ReadGap(const std::string &filename, size_t seek_offset, u8 *read_ptr, + size_t read_size, size_t file_bounds) { + if (fs::exists(filename) && + fs::file_size(filename) >= file_bounds) { + LOG(INFO) << "Blob has a gap in write. Read gap from original file.\n"; + INTERCEPTOR_LIST->hermes_flush_exclusion.insert(filename); + int fd = open(filename.c_str(), O_RDONLY); + if (fd) { + if (flock(fd, LOCK_SH) == -1) { + hermes::FailedLibraryCall("flock"); + } + + ssize_t bytes_read = pread(fd, read_ptr, read_size, seek_offset); + if (bytes_read == -1 || (size_t)bytes_read != read_size) { + hermes::FailedLibraryCall("pread"); + } + + if (flock(fd, LOCK_UN) == -1) { + hermes::FailedLibraryCall("flock"); + } + + if (close(fd) != 0) { + hermes::FailedLibraryCall("close"); + } + } else { + hermes::FailedLibraryCall("open"); + } + INTERCEPTOR_LIST->hermes_flush_exclusion.erase(filename); + } +} diff --git a/adapter/test/CMakeLists.txt b/adapter/test/CMakeLists.txt index e84f471d5..b0c4ac60c 100644 --- a/adapter/test/CMakeLists.txt +++ b/adapter/test/CMakeLists.txt @@ -18,7 +18,9 @@ add_dependencies(hermes_daemon hermes) function(gcc exec args) add_test(NAME Test${exec} COMMAND "${CMAKE_BINARY_DIR}/bin/${exec}" ${args}) set_property(TEST Test${exec} - PROPERTY ENVIRONMENT LSAN_OPTIONS=suppressions=${CMAKE_SOURCE_DIR}/test/data/asan.supp) + PROPERTY ENVIRONMENT LSAN_OPTIONS=suppressions=${CMAKE_SOURCE_DIR}/test/data/asan.supp) + set_property(TEST Test${exec} APPEND + PROPERTY ENVIRONMENT HERMES_CONF=${CMAKE_SOURCE_DIR}/adapter/test/data/hermes.conf) endfunction() function(mpi exec mpi_proc args) diff --git a/adapter/test/hermes_daemon.cc b/adapter/test/hermes_daemon.cc index 8854ddad9..aa0783c93 100644 --- a/adapter/test/hermes_daemon.cc +++ b/adapter/test/hermes_daemon.cc @@ -1,7 +1,7 @@ -#include - #include +#include + #include "hermes.h" /** diff --git a/adapter/test/stdio/CMakeLists.txt b/adapter/test/stdio/CMakeLists.txt index 2990b9ddd..b6deeb8d3 100644 --- a/adapter/test/stdio/CMakeLists.txt +++ b/adapter/test/stdio/CMakeLists.txt @@ -30,8 +30,8 @@ endfunction() #------------------------------------------------------------------------------ add_executable(stdio_adapter_mapper_test stdio_adapter_mapper_test.cpp ${ADAPTER_COMMON}) target_link_libraries(stdio_adapter_mapper_test - hermes_stdio_internal Catch2::Catch2 -lstdc++fs -ldl -lc MPI::MPI_CXX) -add_dependencies(stdio_adapter_mapper_test hermes_stdio_internal) + hermes_stdio Catch2::Catch2 -lstdc++fs -ldl -lc MPI::MPI_CXX) +add_dependencies(stdio_adapter_mapper_test hermes_stdio) gcc(stdio_adapter_mapper_test "") #------------------------------------------------------------------------------ diff --git a/adapter/test/stdio/stdio_adapter_mapper_test.cpp b/adapter/test/stdio/stdio_adapter_mapper_test.cpp index 4d8f3e5fb..10ec65853 100644 --- a/adapter/test/stdio/stdio_adapter_mapper_test.cpp +++ b/adapter/test/stdio/stdio_adapter_mapper_test.cpp @@ -11,6 +11,7 @@ * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ #include +#include #include #include #include @@ -49,11 +50,15 @@ hermes::adapter::stdio::test::Arguments args; hermes::adapter::stdio::test::Info info; int init(int* argc, char*** argv) { - (void)argc; - (void)argv; + MPI_Init(argc, argv); + + return 0; +} +int finalize() { + MPI_Finalize(); + return 0; } -int finalize() { return 0; } int pretest() { fs::path fullpath = args.directory;