diff --git a/.gitignore b/.gitignore index b0478142c..2f5946c38 100644 --- a/.gitignore +++ b/.gitignore @@ -14,3 +14,4 @@ __pycache__/ /cmake-build-debug-system/ /src/adapter/posix/cmake-build-debug-system/CMakeFiles/clion-log.txt /adapter/test/posix/Testing/Temporary/LastTest.log +/.cache/ diff --git a/adapter/CMakeLists.txt b/adapter/CMakeLists.txt index 83a1bfb7b..6e3848f5f 100644 --- a/adapter/CMakeLists.txt +++ b/adapter/CMakeLists.txt @@ -10,3 +10,6 @@ add_subdirectory(src/hermes/adapter/stdio) # add posix adapter add_subdirectory(src/hermes/adapter/posix) + +# add mpiio adapter +add_subdirectory(src/hermes/adapter/mpiio) diff --git a/adapter/include/hermes/adapter/mpiio.h b/adapter/include/hermes/adapter/mpiio.h index a55f12eb6..b24599e18 100644 --- a/adapter/include/hermes/adapter/mpiio.h +++ b/adapter/include/hermes/adapter/mpiio.h @@ -13,6 +13,111 @@ #ifndef HERMES_MPIIO_H #define HERMES_MPIIO_H -class mpiio {}; +/** + * Standard header + */ +#include +#include +#include + +#include + +/** + * Dependent library headers + */ +#include +#include + +#include "glog/logging.h" + +/** + * Internal headers + */ +#include +#include +#include +#include +#include +#include + +#include +#include + +/** + * Function declaration + */ +HERMES_FORWARD_DECL(MPI_File_close, int, (MPI_File * fh)); +HERMES_FORWARD_DECL(MPI_File_iread_at, int, + (MPI_File fh, MPI_Offset offset, void *buf, int count, + MPI_Datatype datatype, MPI_Request *request)); +HERMES_FORWARD_DECL(MPI_File_iread, int, + (MPI_File fh, void *buf, int count, MPI_Datatype datatype, + MPI_Request *request)); +HERMES_FORWARD_DECL(MPI_File_iread_shared, int, + (MPI_File fh, void *buf, int count, MPI_Datatype datatype, + MPI_Request *request)); +HERMES_FORWARD_DECL(MPI_File_iwrite_at, int, + (MPI_File fh, MPI_Offset offset, const void *buf, int count, + MPI_Datatype datatype, MPI_Request *request)); +HERMES_FORWARD_DECL(MPI_File_iwrite, int, + (MPI_File fh, const void *buf, int count, + MPI_Datatype datatype, MPI_Request *request)); +HERMES_FORWARD_DECL(MPI_File_iwrite_shared, int, + (MPI_File fh, const void *buf, int count, + MPI_Datatype datatype, MPI_Request *request)); +HERMES_FORWARD_DECL(MPI_File_open, int, + (MPI_Comm comm, const char *filename, int amode, + MPI_Info info, MPI_File *fh)); + +HERMES_FORWARD_DECL(MPI_File_read_all, int, + (MPI_File fh, void *buf, int count, MPI_Datatype datatype, + MPI_Status *status)); +HERMES_FORWARD_DECL(MPI_File_read_at_all, int, + (MPI_File fh, MPI_Offset offset, void *buf, int count, + MPI_Datatype datatype, MPI_Status *status)); + +HERMES_FORWARD_DECL(MPI_File_read_at, int, + (MPI_File fh, MPI_Offset offset, void *buf, int count, + MPI_Datatype datatype, MPI_Status *status)); +HERMES_FORWARD_DECL(MPI_File_read, int, + (MPI_File fh, void *buf, int count, MPI_Datatype datatype, + MPI_Status *status)); + +HERMES_FORWARD_DECL(MPI_File_read_ordered, int, + (MPI_File fh, void *buf, int count, MPI_Datatype datatype, + MPI_Status *status)); +HERMES_FORWARD_DECL(MPI_File_read_shared, int, + (MPI_File fh, void *buf, int count, MPI_Datatype datatype, + MPI_Status *status)); +HERMES_FORWARD_DECL(MPI_File_sync, int, (MPI_File fh)); +HERMES_FORWARD_DECL(MPI_File_write_all, int, + (MPI_File fh, const void *buf, int count, + MPI_Datatype datatype, MPI_Status *status)); +HERMES_FORWARD_DECL(MPI_File_write_at_all, int, + (MPI_File fh, MPI_Offset offset, const void *buf, int count, + MPI_Datatype datatype, MPI_Status *status)); +HERMES_FORWARD_DECL(MPI_File_write_at, int, + (MPI_File fh, MPI_Offset offset, const void *buf, int count, + MPI_Datatype datatype, MPI_Status *status)); +HERMES_FORWARD_DECL(MPI_File_write, int, + (MPI_File fh, const void *buf, int count, + MPI_Datatype datatype, MPI_Status *status)); +HERMES_FORWARD_DECL(MPI_File_write_ordered, int, + (MPI_File fh, const void *buf, int count, + MPI_Datatype datatype, MPI_Status *status)); +HERMES_FORWARD_DECL(MPI_File_write_shared, int, + (MPI_File fh, const void *buf, int count, + MPI_Datatype datatype, MPI_Status *status)); +HERMES_FORWARD_DECL(MPI_File_seek, int, + (MPI_File fh, MPI_Offset offset, int whence)); +HERMES_FORWARD_DECL(MPI_File_seek_shared, int, + (MPI_File fh, MPI_Offset offset, int whence)); +HERMES_FORWARD_DECL(MPI_File_get_position, int, + (MPI_File fh, MPI_Offset *offset)); +/** + * MPI functions declarations + */ +HERMES_FORWARD_DECL(MPI_Init, int, (int *argc, char ***argv)); +HERMES_FORWARD_DECL(MPI_Finalize, int, (void)); #endif // HERMES_MPIIO_H diff --git a/adapter/src/hermes/adapter/interceptor.h b/adapter/src/hermes/adapter/interceptor.h index 6033b5c0c..920f63328 100644 --- a/adapter/src/hermes/adapter/interceptor.h +++ b/adapter/src/hermes/adapter/interceptor.h @@ -60,7 +60,7 @@ const char* kPathInclusions[] = {"/var/opt/cray/dws/mounts/"}; /** * Splits a string given a delimiter */ -std::vector StringSplit(char* str, char delimiter) { +inline std::vector StringSplit(char* str, char delimiter) { std::stringstream ss(str); std::vector v; while (ss.good()) { @@ -70,7 +70,7 @@ std::vector StringSplit(char* str, char delimiter) { } return v; } -std::string GetFilenameFromFP(FILE* fh) { +inline std::string GetFilenameFromFP(FILE* fh) { const int kMaxSize = 0xFFF; char proclnk[kMaxSize]; char filename[kMaxSize]; diff --git a/adapter/src/hermes/adapter/mpiio/CMakeLists.txt b/adapter/src/hermes/adapter/mpiio/CMakeLists.txt new file mode 100644 index 000000000..5b28cad54 --- /dev/null +++ b/adapter/src/hermes/adapter/mpiio/CMakeLists.txt @@ -0,0 +1,49 @@ +project(MPIIOAdapter VERSION 0.1.0) + +# include directories for mpiio headers +include_directories(${CMAKE_SOURCE_DIR}/adapter/include) + +# MPIIO src code. We only include mpiio.cc as it includes other cc to reduce compilation time. +set(MPIIO_ADAPTER_SRC mpiio.cc) + +# Only mpiio.h is the public adapter. +set(MPIIO_ADAPTER_PUBLIC_HEADER ${CMAKE_SOURCE_DIR}/adapter/include/hermes/adapter/mpiio.h) +# Private headers +set(MPIIO_ADAPTER_PRIVATE_HEADER ${CMAKE_CURRENT_SOURCE_DIR}/metadata_manager.h + ${CMAKE_CURRENT_SOURCE_DIR}/mapper/mapper_factory.h + ${CMAKE_CURRENT_SOURCE_DIR}/mapper/abstract_mapper.h + ${CMAKE_CURRENT_SOURCE_DIR}/mapper/balanced_mapper.h + ${CMAKE_CURRENT_SOURCE_DIR}/common/datastructures.h + ${CMAKE_CURRENT_SOURCE_DIR}/common/enumerations.h + ${CMAKE_CURRENT_SOURCE_DIR}/common/constants.h) + +set(MPIIO_INTERNAL_ADAPTER_SRC ${CMAKE_CURRENT_SOURCE_DIR}/metadata_manager.cc + ${CMAKE_CURRENT_SOURCE_DIR}/mapper/balanced_mapper.cc) + +# Add library hermes_mpiio +add_library(hermes_mpiio SHARED ${MPIIO_ADAPTER_PRIVATE_HEADER} ${MPIIO_ADAPTER_PUBLIC_HEADER} ${MPIIO_ADAPTER_SRC}) +add_dependencies(hermes_mpiio hermes) +target_link_libraries(hermes_mpiio hermes MPI::MPI_CXX) + +add_library(hermes_mpiio_internal SHARED ${MPIIO_ADAPTER_PRIVATE_HEADER} ${MPIIO_ADAPTER_PUBLIC_HEADER} ${MPIIO_INTERNAL_ADAPTER_SRC}) +add_dependencies(hermes_mpiio_internal hermes) +target_link_libraries(hermes_mpiio_internal hermes MPI::MPI_CXX) + +#----------------------------------------------------------------------------- +# Add Target(s) to CMake Install +#----------------------------------------------------------------------------- +install( + TARGETS + hermes_mpiio + EXPORT + ${HERMES_EXPORTED_TARGETS} + LIBRARY DESTINATION ${HERMES_INSTALL_LIB_DIR} + ARCHIVE DESTINATION ${HERMES_INSTALL_LIB_DIR} + RUNTIME DESTINATION ${HERMES_INSTALL_BIN_DIR} +) +#----------------------------------------------------------------------------- +# Add Target(s) to Coverage +#----------------------------------------------------------------------------- +if(HERMES_ENABLE_COVERAGE) + set_coverage_flags(hermes_mpiio) +endif() \ No newline at end of file diff --git a/adapter/src/hermes/adapter/mpiio/common/constants.h b/adapter/src/hermes/adapter/mpiio/common/constants.h new file mode 100644 index 000000000..3e3d8f3f1 --- /dev/null +++ b/adapter/src/hermes/adapter/mpiio/common/constants.h @@ -0,0 +1,47 @@ +/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * + * Distributed under BSD 3-Clause license. * + * Copyright by The HDF Group. * + * Copyright by the Illinois Institute of Technology. * + * All rights reserved. * + * * + * This file is part of Hermes. The full Hermes copyright notice, including * + * terms governing use, modification, and redistribution, is contained in * + * the COPYING file, which can be found at the top directory. If you do not * + * have access to the file, you may request a copy from help@hdfgroup.org. * + * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ + +#ifndef HERMES_MPIIO_COMMON_CONSTANTS_H +#define HERMES_MPIIO_COMMON_CONSTANTS_H + +/** + * Standard header + */ + +/** + * Dependent library header + */ + +/** + * Internal header + */ +#include + +/** + * Constants file for MPIIO adapter. + */ +using hermes::adapter::mpiio::MapperType; + +/** + * Which mapper to be used by MPIIO adapter. + */ +const MapperType kMapperType = MapperType::BALANCED; +/** + * Define kPageSize for balanced mapping. + */ +const size_t kPageSize = 1024 * 1024; +/** + * String delimiter + */ +const char kStringDelimiter = '#'; + +#endif // HERMES_MPIIO_COMMON_CONSTANTS_H diff --git a/adapter/src/hermes/adapter/mpiio/common/datastructures.h b/adapter/src/hermes/adapter/mpiio/common/datastructures.h new file mode 100644 index 000000000..3d72a95f9 --- /dev/null +++ b/adapter/src/hermes/adapter/mpiio/common/datastructures.h @@ -0,0 +1,165 @@ +/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * + * Distributed under BSD 3-Clause license. * + * Copyright by The HDF Group. * + * Copyright by the Illinois Institute of Technology. * + * All rights reserved. * + * * + * This file is part of Hermes. The full Hermes copyright notice, including * + * terms governing use, modification, and redistribution, is contained in * + * the COPYING file, which can be found at the top directory. If you do not * + * have access to the file, you may request a copy from help@hdfgroup.org. * + * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ + +#ifndef HERMES_MPIIO_ADAPTER_DATASTRUCTURES_H +#define HERMES_MPIIO_ADAPTER_DATASTRUCTURES_H + +/** + * Standard header + */ +#include + +#include + +/** + * Dependent library header + */ + +/** + * Internal header + */ +#include +#include +#include +#include + +/** + * Namespace simplification. + */ +namespace hapi = hermes::api; + +namespace hermes::adapter::mpiio { + +/** + * Structure MPIIO adapter uses to define a file state. + */ +struct FileStruct { + /** + * attributes + */ + MPI_File *file_id_; // fileID to identify a file uniquely. + size_t offset_; // file pointer within the file. + size_t size_; // size of data refered in file. + /** + * Constructor + */ + FileStruct() : file_id_(), offset_(0), size_(0) {} /* default constructor */ + FileStruct(MPI_File *file_id, size_t offset, size_t size) + : file_id_(file_id), + offset_(offset), + size_(size) {} /* parameterized constructor */ + FileStruct(const FileStruct &other) + : file_id_(other.file_id_), + offset_(other.offset_), + size_(other.size_) {} /* copy constructor*/ + FileStruct(FileStruct &&other) + : file_id_(other.file_id_), + offset_(other.offset_), + size_(other.size_) {} /* move constructor*/ + /** + * Operators defined + */ + /* Assignment operator. */ + FileStruct &operator=(const FileStruct &other) { + file_id_ = other.file_id_; + offset_ = other.offset_; + size_ = other.size_; + return *this; + } +}; + +/** + * Structure MPIIO adapter uses to define Hermes blob. + */ +struct HermesStruct { + /** + * attributes + */ + std::string blob_name_; + std::string encoded_blob_name_; + size_t offset_; + size_t size_; + /** + * Constructor + */ + HermesStruct() + : blob_name_(), + encoded_blob_name_(), + offset_(0), + size_(0) {} /* default constructor */ + HermesStruct(const HermesStruct &other) + : blob_name_(other.blob_name_), + encoded_blob_name_(other.encoded_blob_name_), + offset_(other.offset_), + size_(other.size_) {} /* copy constructor*/ + HermesStruct(HermesStruct &&other) + : blob_name_(other.blob_name_), + encoded_blob_name_(other.encoded_blob_name_), + offset_(other.offset_), + size_(other.size_) {} /* move constructor*/ + /** + * Operators defined + */ + /* Assignment operator. */ + HermesStruct &operator=(const HermesStruct &other) { + blob_name_ = other.blob_name_; + encoded_blob_name_ = other.encoded_blob_name_; + offset_ = other.offset_; + size_ = other.size_; + return *this; + } +}; + +typedef std::set + StringSet_t; + +/** + * Stat which defines File within MPIIO Adapter. + */ +struct AdapterStat { + /** + * attributes + */ + std::shared_ptr st_bkid; /* bucket associated with the file */ + StringSet_t st_blobs; /* Blobs access in the bucket */ + StringSet_t st_vbuckets; /* vBuckets used in this file */ + i32 ref_count; /* # of time process opens a file */ + int a_mode; /* access mode */ + MPI_Info info; /* Info object (handle) */ + MPI_Comm comm; /* Communicator for the file.*/ + MPI_Offset size; /* total size, in bytes */ + MPI_Offset ptr; /* Current ptr of FILE */ + bool atomicity; /* Consistency semantics for data-access */ + /** + * Constructor + */ + AdapterStat() + : st_bkid(), + st_blobs(CompareBlobs), + ref_count(), + a_mode(), + info(), + comm(), + size(0), + ptr(0), + atomicity(true) {} /* default constructor */ + /** + * Comparator for comparing two blobs. + */ + static bool CompareBlobs(const std::string &a, const std::string &b) { + return std::stol(a) < std::stol(b); + } +}; + +} // namespace hermes::adapter::mpiio +#endif // HERMES_MPIIO_ADAPTER_DATASTRUCTURES_H diff --git a/adapter/src/hermes/adapter/mpiio/mpiio.cpp b/adapter/src/hermes/adapter/mpiio/common/enumerations.h similarity index 66% rename from adapter/src/hermes/adapter/mpiio/mpiio.cpp rename to adapter/src/hermes/adapter/mpiio/common/enumerations.h index b4db88ffb..d202639f8 100644 --- a/adapter/src/hermes/adapter/mpiio/mpiio.cpp +++ b/adapter/src/hermes/adapter/mpiio/common/enumerations.h @@ -10,4 +10,16 @@ * have access to the file, you may request a copy from help@hdfgroup.org. * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ -#include +#ifndef HERMES_MPIIO_COMMON_ENUMERATIONS_H +#define HERMES_MPIIO_COMMON_ENUMERATIONS_H +/** + * Enumeration for MPIIO adapter. + */ +namespace hermes::adapter::mpiio { +/** + * Define different types of mappers supported by MPIIO Adapter. + * Also define its construction in the MapperFactory. + */ +enum MapperType { BALANCED = 0 /* Balanced Mapping */ }; +} // namespace hermes::adapter::mpiio +#endif // HERMES_MPIIO_COMMON_ENUMERATIONS_H diff --git a/adapter/src/hermes/adapter/mpiio/mapper/abstract_mapper.h b/adapter/src/hermes/adapter/mpiio/mapper/abstract_mapper.h new file mode 100644 index 000000000..30c7d898a --- /dev/null +++ b/adapter/src/hermes/adapter/mpiio/mapper/abstract_mapper.h @@ -0,0 +1,49 @@ +/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * + * Distributed under BSD 3-Clause license. * + * Copyright by The HDF Group. * + * Copyright by the Illinois Institute of Technology. * + * All rights reserved. * + * * + * This file is part of Hermes. The full Hermes copyright notice, including * + * terms governing use, modification, and redistribution, is contained in * + * the COPYING file, which can be found at the top directory. If you do not * + * have access to the file, you may request a copy from help@hdfgroup.org. * + * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ + +#ifndef HERMES_MPIIO_ADAPTER_ABSTRACT_ADAPTER_H +#define HERMES_MPIIO_ADAPTER_ABSTRACT_ADAPTER_H +/** + * Standard headers + */ +/** + * Dependent library headers + */ +/** + * Internal headers + */ +#include + +/** + * Typedef to simplify the return types + */ +typedef std::vector> + MapperReturnType; + +namespace hermes::adapter::mpiio { +/** + * Interface to define a mapper. + */ +class AbstractMapper { + public: + /** + * This method maps the current Operation to Hermes data structures. + * + * @param file_op, FileStruct, operations for which we are mapping. + * @return a map of FileStruct to Hermes Struct + */ + virtual MapperReturnType map(const FileStruct& file_op) = 0; +}; +} // namespace hermes::adapter::mpiio + +#endif // HERMES_MPIIO_ADAPTER_ABSTRACT_ADAPTER_H diff --git a/adapter/src/hermes/adapter/mpiio/mapper/balanced_mapper.cc b/adapter/src/hermes/adapter/mpiio/mapper/balanced_mapper.cc new file mode 100644 index 000000000..552edafff --- /dev/null +++ b/adapter/src/hermes/adapter/mpiio/mapper/balanced_mapper.cc @@ -0,0 +1,46 @@ +/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * + * Distributed under BSD 3-Clause license. * + * Copyright by The HDF Group. * + * Copyright by the Illinois Institute of Technology. * + * All rights reserved. * + * * + * This file is part of Hermes. The full Hermes copyright notice, including * + * terms governing use, modification, and redistribution, is contained in * + * the COPYING file, which can be found at the top directory. If you do not * + * have access to the file, you may request a copy from help@hdfgroup.org. * + * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ + +#include "balanced_mapper.h" + +/** + * Namespace declaration for cleaner code. + */ +using hermes::adapter::mpiio::BalancedMapper; +using hermes::adapter::mpiio::FileStruct; +using hermes::adapter::mpiio::HermesStruct; + +MapperReturnType BalancedMapper::map(const FileStruct& file_op) { + LOG(INFO) << "Mapping File with offset:" << file_op.offset_ + << " and size:" << file_op.size_ << "." << std::endl; + + auto mapper_return = MapperReturnType(); + size_t size_mapped = 0; + while (file_op.size_ > size_mapped) { + FileStruct file; + file.file_id_ = file_op.file_id_; + HermesStruct hermes; + file.offset_ = file_op.offset_ + size_mapped; + size_t page_index = file.offset_ / kPageSize; + hermes.offset_ = file.offset_ % kPageSize; + auto left_size_page = kPageSize - hermes.offset_; + hermes.size_ = left_size_page < file_op.size_ - size_mapped + ? left_size_page + : file_op.size_ - size_mapped; + + file.size_ = hermes.size_; + hermes.blob_name_ = std::to_string(page_index + 1); + mapper_return.emplace_back(file, hermes); + size_mapped += hermes.size_; + } + return mapper_return; +} diff --git a/adapter/src/hermes/adapter/mpiio/mapper/balanced_mapper.h b/adapter/src/hermes/adapter/mpiio/mapper/balanced_mapper.h new file mode 100644 index 000000000..019406950 --- /dev/null +++ b/adapter/src/hermes/adapter/mpiio/mapper/balanced_mapper.h @@ -0,0 +1,48 @@ +/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * + * Distributed under BSD 3-Clause license. * + * Copyright by The HDF Group. * + * Copyright by the Illinois Institute of Technology. * + * All rights reserved. * + * * + * This file is part of Hermes. The full Hermes copyright notice, including * + * terms governing use, modification, and redistribution, is contained in * + * the COPYING file, which can be found at the top directory. If you do not * + * have access to the file, you may request a copy from help@hdfgroup.org. * + * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ + +#ifndef HERMES_MPIIO_ADAPTER_BALANCED_MAPPER_H +#define HERMES_MPIIO_ADAPTER_BALANCED_MAPPER_H + +/** + * Standard header + */ +#include + +/** + * Dependent library header + */ + +/** + * Internal header + */ +#include + +#include "abstract_mapper.h" + +namespace hermes::adapter::mpiio { +/** + * Implement balanced mapping + */ +class BalancedMapper : public AbstractMapper { + public: + /** + * This method maps the current Operation to Hermes data structures. + * + * @param file_op, FileStruct, operations for which we are mapping. + * @return a map of FileStruct to Hermes Struct + */ + MapperReturnType map(const FileStruct& file_op) override; +}; +} // namespace hermes::adapter::mpiio + +#endif // HERMES_MPIIO_ADAPTER_BALANCED_MAPPER_H diff --git a/adapter/src/hermes/adapter/mpiio/mapper/mapper_factory.h b/adapter/src/hermes/adapter/mpiio/mapper/mapper_factory.h new file mode 100644 index 000000000..e4ebad958 --- /dev/null +++ b/adapter/src/hermes/adapter/mpiio/mapper/mapper_factory.h @@ -0,0 +1,55 @@ +/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * + * Distributed under BSD 3-Clause license. * + * Copyright by The HDF Group. * + * Copyright by the Illinois Institute of Technology. * + * All rights reserved. * + * * + * This file is part of Hermes. The full Hermes copyright notice, including * + * terms governing use, modification, and redistribution, is contained in * + * the COPYING file, which can be found at the top directory. If you do not * + * have access to the file, you may request a copy from help@hdfgroup.org. * + * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ + +#ifndef HERMES_MPIIO_ADAPTER_ADAPTER_FACTORY_H +#define HERMES_MPIIO_ADAPTER_ADAPTER_FACTORY_H + +/** + * Standard header + */ + +/** + * Dependent library header + */ + +/** + * Internal header + */ +#include +#include + +#include "abstract_mapper.h" +#include "balanced_mapper.h" + +namespace hermes::adapter::mpiio { +class MapperFactory { + public: + /** + * Return the instance of mapper given a type. Uses factory pattern. + * + * @param type, MapperType, type of mapper to be used by the MPIIO adapter. + * @return Instance of mapper given a type. + */ + std::shared_ptr Get(const MapperType &type) { + switch (type) { + case MapperType::BALANCED: { + return hermes::adapter::Singleton::GetInstance(); + } + default: { + // TODO(hari): @error_handling Mapper not implemented + } + } + return NULL; + } +}; +} // namespace hermes::adapter::mpiio +#endif // HERMES_MPIIO_ADAPTER_ADAPTER_FACTORY_H diff --git a/adapter/src/hermes/adapter/mpiio/metadata_manager.cc b/adapter/src/hermes/adapter/mpiio/metadata_manager.cc new file mode 100644 index 000000000..528a7016a --- /dev/null +++ b/adapter/src/hermes/adapter/mpiio/metadata_manager.cc @@ -0,0 +1,86 @@ +/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * + * Distributed under BSD 3-Clause license. * + * Copyright by The HDF Group. * + * Copyright by the Illinois Institute of Technology. * + * All rights reserved. * + * * + * This file is part of Hermes. The full Hermes copyright notice, including * + * terms governing use, modification, and redistribution, is contained in * + * the COPYING file, which can be found at the top directory. If you do not * + * have access to the file, you may request a copy from help@hdfgroup.org. * + * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ + +#include "metadata_manager.h" + + + +/** + * Namespace declarations for cleaner code. + */ +using hermes::adapter::mpiio::AdapterStat; +using hermes::adapter::mpiio::HermesStruct; +using hermes::adapter::mpiio::MetadataManager; + +bool MetadataManager::Create(MPI_File *fh, const AdapterStat &stat) { + LOG(INFO) << "Create metadata for file handler." << std::endl; + auto ret = metadata.emplace(*fh, stat); + return ret.second; +} + +bool MetadataManager::Update(MPI_File *fh, const AdapterStat &stat) { + LOG(INFO) << "Update metadata for file handler." << std::endl; + auto iter = metadata.find(*fh); + if (iter != metadata.end()) { + metadata.erase(iter); + auto ret = metadata.emplace(*fh, stat); + return ret.second; + } else { + return false; + } +} + +std::pair MetadataManager::Find(MPI_File *fh) { + typedef std::pair MetadataReturn; + auto iter = metadata.find(*fh); + if (iter == metadata.end()) + return MetadataReturn(AdapterStat(), false); + else + return MetadataReturn(iter->second, true); +} + +bool MetadataManager::Delete(MPI_File *fh) { + LOG(INFO) << "Delete metadata for file handler." << std::endl; + auto iter = metadata.find(*fh); + if (iter != metadata.end()) { + metadata.erase(iter); + return true; + } else { + return false; + } +} + +std::string MetadataManager::EncodeBlobNameLocal(HermesStruct hermes_struct) { + LOG(INFO) << "Encode Blob:" << hermes_struct.blob_name_ + << " for hermes blobs." << std::endl; + return hermes_struct.blob_name_ + kStringDelimiter + + std::to_string(hermes_struct.offset_) + kStringDelimiter + + std::to_string(hermes_struct.size_) + kStringDelimiter + + std::to_string(rank); +} + +std::pair MetadataManager::DecodeBlobNameLocal( + std::string &encoded_blob_name) { + HermesStruct hermes_struct; + auto str_split = + hermes::adapter::StringSplit(encoded_blob_name.data(), kStringDelimiter); + hermes_struct.encoded_blob_name_ = encoded_blob_name; + hermes_struct.blob_name_ = encoded_blob_name; + int blob_rank = -1; + if (str_split.size() == 4) { + hermes_struct.blob_name_ = str_split[0]; + hermes_struct.offset_ = std::stoi(str_split[1]); + hermes_struct.size_ = std::stoi(str_split[2]); + blob_rank = std::stoi(str_split[3]); + } + return std::pair(blob_rank, hermes_struct); +} diff --git a/adapter/src/hermes/adapter/mpiio/metadata_manager.h b/adapter/src/hermes/adapter/mpiio/metadata_manager.h new file mode 100644 index 000000000..8c63d0959 --- /dev/null +++ b/adapter/src/hermes/adapter/mpiio/metadata_manager.h @@ -0,0 +1,161 @@ +/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * + * Distributed under BSD 3-Clause license. * + * Copyright by The HDF Group. * + * Copyright by the Illinois Institute of Technology. * + * All rights reserved. * + * * + * This file is part of Hermes. The full Hermes copyright notice, including * + * terms governing use, modification, and redistribution, is contained in * + * the COPYING file, which can be found at the top directory. If you do not * + * have access to the file, you may request a copy from help@hdfgroup.org. * + * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ + +#ifndef HERMES_MPIIO_ADAPTER_METADATA_MANAGER_H +#define HERMES_MPIIO_ADAPTER_METADATA_MANAGER_H + +/** + * Standard headers + */ +#include + +#include + +/** + * Internal headers + */ +#include +#include +#include +#include +#include +#include + +namespace hermes::adapter::mpiio { +/** + * Metadata manager for MPIIO adapter + */ +class MetadataManager { + private: + /** + * Private members + */ + /** + * Maintain a local metadata FileID structure mapped to Adapter Stats. + */ + std::unordered_map metadata; + /** + * hermes attribute to initialize Hermes + */ + std::shared_ptr hermes; + /** + * references of how many times hermes was tried to initialize. + */ + std::atomic ref; + + public: + /** + * MPI attributes + */ + int rank; + int comm_size; + + /** + * Constructor + */ + MetadataManager() : metadata(), ref(0), rank(0), comm_size(1) {} + /** + * Get the instance of hermes. + */ + std::shared_ptr& GetHermes() { return hermes; } + + /** + * Initialize hermes. Get the kHermesConf from environment else get_env + * returns NULL which is handled internally by hermes. Initialize hermes in + * daemon mode. Keep a reference of how many times Initialize is called. + * Within the adapter, Initialize is called from fopen. + */ + void InitializeHermes() { + if (ref == 0) { + char* hermes_config = getenv(kHermesConf); + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + MPI_Comm_size(MPI_COMM_WORLD, &comm_size); + if (comm_size > 1) { + hermes = hermes::InitHermesClient(hermes_config); + } else { + hermes = hermes::InitHermesDaemon(hermes_config); + } + INTERCEPTOR_LIST->SetupAdapterMode(); + } + ref++; + } + /** + * Finalize hermes and close rpc if reference is equal to one. Else just + * decrement the ref counter. + */ + void FinalizeHermes() { + if (ref == 1) { + if (this->comm_size > 1) { + MPI_Barrier(MPI_COMM_WORLD); + if (this->rank == 0) { + hermes->RemoteFinalize(); + } + hermes->Finalize(); + } else { + hermes->Finalize(true); + } + } + ref--; + } + + /** + * Create a metadata entry for MPIIO adapter for a given file handler. + * @param fh, FILE*, original file handler of the file on the destination + * filesystem. + * @param stat, AdapterStat, MPIIO Adapter version of Stat data structure. + * @return true, if operation was successful. + * false, if operation was unsuccessful. + */ + bool Create(MPI_File* fh, const AdapterStat& stat); + + /** + * Update existing metadata entry for MPIIO adapter for a given file handler. + * @param fh, FILE*, original file handler of the file on the destination. + * @param stat, AdapterStat, MPIIO Adapter version of Stat data structure. + * @return true, if operation was successful. + * false, if operation was unsuccessful or entry doesn't exist. + */ + bool Update(MPI_File* fh, const AdapterStat& stat); + + /** + * Delete existing metadata entry for MPIIO adapter for a given file handler. + * @param fh, FILE*, original file handler of the file on the destination. + * @return true, if operation was successful. + * false, if operation was unsuccessful. + */ + bool Delete(MPI_File* fh); + + /** + * Find existing metadata entry for MPIIO adapter for a given file handler. + * @param fh, FILE*, original file handler of the file on the destination. + * @return The metadata entry if exist. + * The bool in pair indicated whether metadata entry exists. + */ + std::pair Find(MPI_File* fh); + /** + * Encode a given Hermes Struct from Mapping engine to Local Blob Name. + * @param hermes_struct, HermesStruct, structure containing hermes + * representatrion of file. + * @return string encoded with process local information. + */ + std::string EncodeBlobNameLocal(HermesStruct hermes_struct); + /** + * Decodes a encoded blob name string into hermesstruct + * @param encoded_blob_name, std::string, encoded blob name string. + * @return hermes struct with blob_name, rank, offset, and size + */ + std::pair DecodeBlobNameLocal( + std::string& encoded_blob_name); +}; +} // namespace hermes::adapter::mpiio + +#endif // HERMES_MPIIO_ADAPTER_METADATA_MANAGER_H diff --git a/adapter/src/hermes/adapter/mpiio/mpiio.cc b/adapter/src/hermes/adapter/mpiio/mpiio.cc new file mode 100644 index 000000000..8086aa746 --- /dev/null +++ b/adapter/src/hermes/adapter/mpiio/mpiio.cc @@ -0,0 +1,1011 @@ +/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * + * Distributed under BSD 3-Clause license. * + * Copyright by The HDF Group. * + * Copyright by the Illinois Institute of Technology. * + * All rights reserved. * + * * + * This file is part of Hermes. The full Hermes copyright notice, including * + * terms governing use, modification, and redistribution, is contained in * + * the COPYING file, which can be found at the top directory. If you do not * + * have access to the file, you may request a copy from help@hdfgroup.org. * + * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ + +#include + +#include + +#include "mpi.h" +/** + * Namespace declarations + */ +using hermes::adapter::mpiio::AdapterStat; +using hermes::adapter::mpiio::FileStruct; +using hermes::adapter::mpiio::MapperFactory; +using hermes::adapter::mpiio::MetadataManager; + +namespace hapi = hermes::api; +namespace fs = std::experimental::filesystem; +/** + * Internal Functions. + */ +inline std::string GetFilenameFromFP(MPI_File *fh) { + MPI_Info info_out; + int status = MPI_File_get_info(*fh, &info_out); + if (status != MPI_SUCCESS) { + LOG(ERROR) << "MPI_File_get_info on file handler failed." << std::endl; + } + const int kMaxSize = 0xFFF; + int flag; + char filename[kMaxSize]; + MPI_Info_get(info_out, "filename", kMaxSize, filename, &flag); + return filename; +} + +inline bool IsTracked(MPI_File *fh) { + if (hermes::adapter::exit) return false; + auto mdm = hermes::adapter::Singleton::GetInstance(); + auto existing = mdm->Find(fh); + return existing.second; +} + +int simple_open(MPI_Comm &comm, const char *path, int &amode, MPI_Info &info, + MPI_File *fh) { + LOG(INFO) << "Open file for filename " << path << " in mode " << amode + << std::endl; + int ret = MPI_SUCCESS; + auto mdm = hermes::adapter::Singleton::GetInstance(); + auto existing = mdm->Find(fh); + if (!existing.second) { + LOG(INFO) << "File not opened before by adapter" << std::endl; + AdapterStat stat; + stat.ref_count = 1; + stat.a_mode = amode; + MPI_File_get_size(*fh, &stat.size); + if (amode & MPI_MODE_APPEND) { + stat.ptr = stat.size; + } + stat.info = info; + stat.comm = comm; + mdm->InitializeHermes(); + hapi::Context ctx; + stat.st_bkid = std::make_shared(path, mdm->GetHermes(), ctx); + mdm->Create(fh, stat); + } else { + LOG(INFO) << "File opened before by adapter" << std::endl; + existing.first.ref_count++; + mdm->Update(fh, existing.first); + } + return ret; +} + +int open_internal(MPI_Comm &comm, const char *path, int &amode, MPI_Info &info, + MPI_File *fh) { + int ret; + MAP_OR_FAIL(MPI_File_open); + ret = real_MPI_File_open_(comm, path, amode, info, fh); + if (ret == MPI_SUCCESS) { + ret = simple_open(comm, path, amode, info, fh); + } + return ret; +} +size_t perform_file_read(const char *filename, size_t file_offset, void *ptr, + size_t ptr_offset, int count, MPI_Datatype datatype) { + LOG(INFO) << "Read called for filename from destination: " << filename + << " on offset: " << file_offset << " and count: " << count + << std::endl; + INTERCEPTOR_LIST->hermes_flush_exclusion.insert(filename); + MPI_File fh; + int status = MPI_File_open(MPI_COMM_SELF, filename, MPI_MODE_RDONLY, + MPI_INFO_NULL, &fh); + int read_size = 0; + if (status == MPI_SUCCESS) { + int status = MPI_File_seek(fh, file_offset, MPI_SEEK_SET); + if (status == MPI_SUCCESS) { + MPI_Status read_status; + status = MPI_File_read(fh, (char *)ptr + ptr_offset, count, datatype, + &read_status); + MPI_Get_count(&read_status, datatype, &read_size); + if (read_size != count) { + LOG(ERROR) << "reading failed: read " << read_size << " of " << count + << "." << std::endl; + } + } + status = MPI_File_close(&fh); + } + INTERCEPTOR_LIST->hermes_flush_exclusion.erase(filename); + return read_size; +} + +size_t perform_file_write(std::string &filename, int offset, int count, + MPI_Datatype datatype, unsigned char *data_ptr) { + LOG(INFO) << "Writing to file: " << filename << " offset: " << offset + << " of count:" << count << " datatype:" << datatype << "." + << std::endl; + INTERCEPTOR_LIST->hermes_flush_exclusion.insert(filename); + MPI_File fh; + int status = MPI_File_open(MPI_COMM_SELF, filename.c_str(), MPI_MODE_RDWR, + MPI_INFO_NULL, &fh); + int write_size = 0; + if (fh != nullptr) { + status = MPI_File_seek(fh, offset, MPI_SEEK_SET); + if (status == 0) { + MPI_Status write_status; + status = MPI_File_write(fh, data_ptr, count, datatype, &write_status); + MPI_Get_count(&write_status, datatype, &write_size); + if (write_size != count) { + LOG(ERROR) << "writing failed: wrote " << write_size << " of " << count + << "." << std::endl; + } + status = MPI_File_close(&fh); + } + } + INTERCEPTOR_LIST->hermes_flush_exclusion.erase(filename); + return write_size; +} + +std::pair write_internal(std::pair &existing, + const void *ptr, int count, + MPI_Datatype datatype, MPI_File *fp, + MPI_Status *mpi_status, + bool is_collective = false) { + (void)is_collective; + LOG(INFO) << "Write called for filename: " + << existing.first.st_bkid->GetName() + << " on offset: " << existing.first.ptr << " and count: " << count + << std::endl; + size_t ret; + auto mdm = hermes::adapter::Singleton::GetInstance(); + auto mapper = MapperFactory().Get(kMapperType); + int datatype_size; + MPI_Type_size(datatype, &datatype_size); + size_t total_size = datatype_size * count; + auto mapping = mapper->map(FileStruct(fp, existing.first.ptr, total_size)); + size_t data_offset = 0; + auto filename = existing.first.st_bkid->GetName(); + LOG(INFO) << "Mapping for write has " << mapping.size() << " mapping." + << std::endl; + for (const auto &item : mapping) { + hapi::Context ctx; + auto index = std::stol(item.second.blob_name_) - 1; + unsigned char *put_data_ptr = (unsigned char *)ptr + data_offset; + size_t put_data_ptr_size = item.first.size_; + + if (item.second.size_ == kPageSize) { + LOG(INFO) << "Create or Overwrite blob " << item.second.blob_name_ + << " of size:" << item.second.size_ << "." << std::endl; + if (item.second.size_ == kPageSize) { + auto status = existing.first.st_bkid->Put( + item.second.blob_name_, put_data_ptr, put_data_ptr_size, ctx); + if (status.Failed()) { + perform_file_write(filename, item.first.offset_, put_data_ptr_size, + MPI_CHAR, put_data_ptr); + } else { + existing.first.st_blobs.emplace(item.second.blob_name_); + } + // TODO(chogan): The commented out branches are unreachable. Hari needs to + // take a look at this + } +#if 0 + } else if (item.second.offset_ == 0) { + auto status = existing.first.st_bkid->Put( + item.second.blob_name_, put_data_ptr, put_data_ptr_size, ctx); + if (status.Failed()) { + perform_file_write(filename, index * kPageSize, put_data_ptr_size, + MPI_CHAR, put_data_ptr); + } else { + existing.first.st_blobs.emplace(item.second.blob_name_); + } + } else { + hapi::Blob final_data(item.second.offset_ + item.second.size_); + if (fs::exists(filename) && + fs::file_size(filename) >= item.second.offset_) { + LOG(INFO) << "Blob has a gap in write. read gap from original file." + << std::endl; + perform_file_read(filename.c_str(), index * kPageSize, + final_data.data(), 0, item.second.offset_, + MPI_CHAR); + } + memcpy(final_data.data() + item.second.offset_, put_data_ptr, + put_data_ptr_size); + auto status = existing.first.st_bkid->Put(item.second.blob_name_, + final_data, ctx); + if (status.Failed()) { + perform_file_write(filename, index * kPageSize, final_data.size(), + MPI_CHAR, final_data.data()); + } else { + existing.first.st_blobs.emplace(item.second.blob_name_); + } + } +#endif + /* TODO(hari): Check if vbucket exists. if so delete it.*/ + } else { + LOG(INFO) << "Writing blob " << item.second.blob_name_ + << " of size:" << item.second.size_ << "." << std::endl; + auto blob_exists = + existing.first.st_bkid->ContainsBlob(item.second.blob_name_); + hapi::Blob temp(0); + auto existing_blob_size = + existing.first.st_bkid->Get(item.second.blob_name_, temp, ctx); + if (blob_exists) { + LOG(INFO) << "blob " << item.second.blob_name_ + << " of size:" << existing_blob_size << "." << std::endl; + if (existing_blob_size != kPageSize) { + LOG(ERROR) << "blob " << item.second.blob_name_ + << " has of size:" << existing_blob_size + << " of the overall page." << std::endl; + } + hapi::Blob existing_data(existing_blob_size); + existing.first.st_bkid->Get(item.second.blob_name_, existing_data, ctx); + memcpy(existing_data.data() + item.second.offset_, put_data_ptr, + put_data_ptr_size); + auto status = existing.first.st_bkid->Put(item.second.blob_name_, + existing_data, ctx); + if (status.Failed()) { + perform_file_write(filename, index * kPageSize, existing_blob_size, + MPI_CHAR, existing_data.data()); + } else { + existing.first.st_blobs.emplace(item.second.blob_name_); + } + } else { + std::string process_local_blob_name = + mdm->EncodeBlobNameLocal(item.second); + auto vbucket = + hapi::VBucket(item.second.blob_name_, mdm->GetHermes(), false); + existing.first.st_vbuckets.emplace(item.second.blob_name_); + auto blob_names = vbucket.GetLinks(ctx); + LOG(INFO) << "vbucket with blobname " << item.second.blob_name_ + << " does not exists." << std::endl; + auto status = existing.first.st_bkid->Put( + process_local_blob_name, put_data_ptr, put_data_ptr_size, ctx); + if (status.Failed()) { + perform_file_write(filename, item.first.offset_, put_data_ptr_size, + MPI_CHAR, put_data_ptr); + } else { + existing.first.st_blobs.emplace(process_local_blob_name); + vbucket.Link(process_local_blob_name, + existing.first.st_bkid->GetName()); + } + if (!blob_names.empty()) { + LOG(INFO) << "vbucket with blobname " << item.second.blob_name_ + << " exists." << std::endl; + for (auto &blob_name : blob_names) { + auto hermes_struct = mdm->DecodeBlobNameLocal(blob_name); + if (((hermes_struct.second.offset_ < item.second.offset_) && + (hermes_struct.second.offset_ + hermes_struct.second.size_ > + item.second.offset_))) { + // partially contained second half + hapi::Blob existing_data(item.second.offset_ - + hermes_struct.second.offset_); + existing.first.st_bkid->Get(item.second.blob_name_, existing_data, + ctx); + status = existing.first.st_bkid->Put(item.second.blob_name_, + existing_data, ctx); + if (status.Failed()) { + LOG(ERROR) << "Put Failed on adapter." << std::endl; + } + } else if (item.second.offset_ < hermes_struct.second.offset_ && + item.second.offset_ + item.second.size_ > + hermes_struct.second.offset_) { + // partially contained first half + hapi::Blob existing_data(hermes_struct.second.size_); + existing.first.st_bkid->Get(item.second.blob_name_, existing_data, + ctx); + existing_data.erase( + existing_data.begin(), + existing_data.begin() + + (hermes_struct.second.offset_ - item.second.offset_)); + status = existing.first.st_bkid->Put(item.second.blob_name_, + existing_data, ctx); + if (status.Failed()) { + LOG(ERROR) << "Put Failed on adapter." << std::endl; + } + } else if (hermes_struct.second.offset_ > item.second.offset_ && + hermes_struct.second.offset_ + + hermes_struct.second.size_ < + item.second.offset_ + item.second.size_) { + // fully contained + status = + existing.first.st_bkid->DeleteBlob(item.second.blob_name_); + if (status.Failed()) { + LOG(ERROR) << "Delete blob Failed on adapter." << std::endl; + } + existing.first.st_blobs.erase(item.second.blob_name_); + } else { + // no overlap + } + } + } + } + } + data_offset += item.first.size_; + } + existing.first.ptr += data_offset; + existing.first.size = existing.first.size >= existing.first.ptr + ? existing.first.size + : existing.first.ptr; + mdm->Update(fp, existing.first); + ret = data_offset; + mpi_status->count_hi_and_cancelled = 0; + mpi_status->count_lo = ret; + return std::pair(MPI_SUCCESS, ret); +} + +std::pair read_internal(std::pair &existing, + void *ptr, int count, + MPI_Datatype datatype, MPI_File *fp, + MPI_Status *mpi_status, + bool is_collective = false) { + (void)is_collective; + LOG(INFO) << "Read called for filename: " << existing.first.st_bkid->GetName() + << " on offset: " << existing.first.ptr << " and size: " << count + << std::endl; + if (existing.first.ptr >= existing.first.size) { + mpi_status->count_hi_and_cancelled = 0; + mpi_status->count_lo = 0; + return std::pair(MPI_SUCCESS, 0); + } + int datatype_size; + MPI_Type_size(datatype, &datatype_size); + size_t total_size = datatype_size * count; + auto mdm = hermes::adapter::Singleton::GetInstance(); + auto mapper = MapperFactory().Get(kMapperType); + auto mapping = mapper->map(FileStruct(fp, existing.first.ptr, total_size)); + size_t total_read_size = 0; + auto filename = existing.first.st_bkid->GetName(); + LOG(INFO) << "Mapping for read has " << mapping.size() << " mapping." + << std::endl; + for (const auto &item : mapping) { + hapi::Context ctx; + auto blob_exists = + existing.first.st_bkid->ContainsBlob(item.second.blob_name_); + hapi::Blob read_data(0); + size_t read_size = 0; + if (blob_exists) { + LOG(INFO) << "Blob exists and need to read from Hermes from blob: " + << item.second.blob_name_ << "." << std::endl; + auto exiting_blob_size = + existing.first.st_bkid->Get(item.second.blob_name_, read_data, ctx); + + read_data.resize(exiting_blob_size); + existing.first.st_bkid->Get(item.second.blob_name_, read_data, ctx); + bool contains_blob = exiting_blob_size > item.second.offset_; + if (contains_blob) { + read_size = read_data.size() < item.second.offset_ + item.second.size_ + ? exiting_blob_size - item.second.offset_ + : item.second.size_; + LOG(INFO) << "Blob have data and need to read from hemes " + "blob: " + << item.second.blob_name_ << " offset:" << item.second.offset_ + << " size:" << read_size << "." << std::endl; + memcpy((char *)ptr + total_read_size, + read_data.data() + item.second.offset_, read_size); + if (read_size < item.second.size_) { + contains_blob = true; + } else { + contains_blob = false; + } + } else { + LOG(INFO) << "Blob does not have data and need to read from original " + "filename: " + << filename << " offset:" << item.first.offset_ + << " size:" << item.first.size_ << "." << std::endl; + auto file_read_size = perform_file_read( + filename.c_str(), item.first.offset_, ptr, total_read_size, + (int)item.second.size_, MPI_CHAR); + read_size += file_read_size; + } + if (contains_blob && fs::exists(filename) && + fs::file_size(filename) >= item.first.offset_ + item.first.size_) { + LOG(INFO) << "Blob does not have data and need to read from original " + "filename: " + << filename << " offset:" << item.first.offset_ + read_size + << " size:" << item.second.size_ - read_size << "." + << std::endl; + auto new_read_size = + perform_file_read(filename.c_str(), item.first.offset_, ptr, + total_read_size + read_size, + item.second.size_ - read_size, MPI_CHAR); + read_size += new_read_size; + } + } else { + hapi::VBucket vbucket(item.second.blob_name_, mdm->GetHermes(), false); + auto blob_names = vbucket.GetLinks(ctx); + if (!blob_names.empty()) { + LOG(INFO) << "vbucket with blobname " << item.second.blob_name_ + << " exists." << std::endl; + for (auto &blob_name : blob_names) { + auto hermes_struct = mdm->DecodeBlobNameLocal(blob_name); + if (((hermes_struct.second.offset_ <= item.second.offset_) && + (hermes_struct.second.offset_ + hermes_struct.second.size_ >= + item.second.offset_) && + (hermes_struct.second.size_ - item.second.offset_ - + hermes_struct.second.offset_ > + 0))) { + // partially contained second half + hapi::Blob existing_data(hermes_struct.second.size_); + existing.first.st_bkid->Get(hermes_struct.second.encoded_blob_name_, + existing_data, ctx); + auto offset_to_cp = + item.second.offset_ - hermes_struct.second.offset_; + memcpy((char *)ptr + (item.first.offset_ - existing.first.ptr), + existing_data.data() + offset_to_cp, + hermes_struct.second.size_ - offset_to_cp); + read_size += hermes_struct.second.size_ - offset_to_cp; + } else if (item.second.offset_ < hermes_struct.second.offset_ && + item.second.offset_ + item.second.size_ > + hermes_struct.second.offset_) { + // partially contained first half + hapi::Blob existing_data(hermes_struct.second.size_); + existing.first.st_bkid->Get(hermes_struct.second.encoded_blob_name_, + existing_data, ctx); + memcpy((char *)ptr + (item.first.offset_ - existing.first.ptr), + existing_data.data(), + (hermes_struct.second.offset_ - item.second.offset_)); + read_size += hermes_struct.second.offset_ - item.second.offset_; + } else if (hermes_struct.second.offset_ > item.second.offset_ && + hermes_struct.second.offset_ + hermes_struct.second.size_ < + item.second.offset_ + item.second.size_) { + // fully contained + hapi::Blob existing_data(hermes_struct.second.size_); + existing.first.st_bkid->Get(hermes_struct.second.encoded_blob_name_, + existing_data, ctx); + memcpy((char *)ptr + (item.first.offset_ - existing.first.ptr), + existing_data.data(), hermes_struct.second.size_); + read_size += hermes_struct.second.size_; + } else { + // no overlap + } + if (read_size == item.second.size_) break; + } + } else if (fs::exists(filename) && + fs::file_size(filename) >= + item.first.offset_ + item.first.size_) { + LOG(INFO) << "Blob does not exists and need to read from original " + "filename: " + << filename << " offset:" << item.first.offset_ + << " size:" << item.first.size_ << "." << std::endl; + read_size = + perform_file_read(filename.c_str(), item.first.offset_, ptr, + total_read_size, item.first.size_, MPI_CHAR); + } + } + + if (read_size > 0) { + total_read_size += read_size; + } + } + existing.first.ptr += total_read_size; + mdm->Update(fp, existing.first); + mpi_status->count_hi_and_cancelled = 0; + mpi_status->count_lo = total_read_size; + return std::pair(MPI_SUCCESS, total_read_size); +} +/** + * MPI + */ +int HERMES_DECL(MPI_Init)(int *argc, char ***argv) { + MAP_OR_FAIL(MPI_Init); + int status = real_MPI_Init_(argc, argv); + if (status == 0) { + LOG(INFO) << "MPI Init intercepted." << std::endl; + auto mdm = hermes::adapter::Singleton::GetInstance(); + mdm->InitializeHermes(); + } + return status; +} + +int HERMES_DECL(MPI_Finalize)(void) { + LOG(INFO) << "MPI Finalize intercepted." << std::endl; + auto mdm = hermes::adapter::Singleton::GetInstance(); + mdm->FinalizeHermes(); + MAP_OR_FAIL(MPI_Finalize); + int status = real_MPI_Finalize_(); + return status; +} +/** + * Metadata functions + */ +int HERMES_DECL(MPI_File_open)(MPI_Comm comm, const char *filename, int amode, + MPI_Info info, MPI_File *fh) { + int status; + if (hermes::adapter::IsTracked(filename)) { + LOG(INFO) << "Intercept MPI_File_open for filename: " << filename + << " and mode: " << amode << " is tracked." << std::endl; + status = open_internal(comm, filename, amode, info, fh); + } else { + MAP_OR_FAIL(MPI_File_open); + status = real_MPI_File_open_(comm, filename, amode, info, fh); + } + return (status); +} + +int HERMES_DECL(MPI_File_close)(MPI_File *fh) { + int ret; + if (IsTracked(fh)) { + LOG(INFO) << "Intercept MPI_File_close." << std::endl; + auto mdm = hermes::adapter::Singleton::GetInstance(); + auto existing = mdm->Find(fh); + if (existing.second) { + MPI_Barrier(existing.first.comm); + LOG(INFO) << "File handler is opened by adapter." << std::endl; + hapi::Context ctx; + if (existing.first.ref_count == 1) { + auto filename = existing.first.st_bkid->GetName(); + auto persist = INTERCEPTOR_LIST->Persists(filename); + mdm->Delete(fh); + const auto &blob_names = existing.first.st_blobs; + if (!blob_names.empty() && persist) { + LOG(INFO) << "Adapter flushes " << blob_names.size() + << " blobs to filename:" << filename << "." << std::endl; + INTERCEPTOR_LIST->hermes_flush_exclusion.insert(filename); + hermes::api::VBucket file_vbucket(filename, mdm->GetHermes(), true, + ctx); + auto offset_map = std::unordered_map(); + + for (auto blob_name : blob_names) { + auto h_struct = mdm->DecodeBlobNameLocal(blob_name); + auto status = file_vbucket.Link(blob_name, filename, ctx); + if (!status.Failed()) { + if (h_struct.first == -1) { + auto page_index = std::stol(blob_name) - 1; + offset_map.emplace(blob_name, page_index * kPageSize); + } else { + auto page_index = std::stol(h_struct.second.blob_name_) - 1; + offset_map.emplace(blob_name, page_index * kPageSize + + h_struct.second.offset_); + } + } + } + auto trait = hermes::api::FileMappingTrait(filename, offset_map, + nullptr, NULL, NULL); + file_vbucket.Attach(&trait, ctx); + file_vbucket.Destroy(ctx); + for (const auto &vbucket : existing.first.st_vbuckets) { + hermes::api::VBucket blob_vbucket(vbucket, mdm->GetHermes(), false, + ctx); + auto blob_names_v = blob_vbucket.GetLinks(ctx); + for (auto &blob_name : blob_names_v) { + blob_vbucket.Unlink(blob_name, existing.first.st_bkid->GetName()); + } + auto blob_names_temp = blob_vbucket.GetLinks(ctx); + blob_vbucket.Destroy(ctx); + } + for (auto &blob_name : existing.first.st_blobs) { + existing.first.st_bkid->DeleteBlob(blob_name); + } + existing.first.st_blobs.clear(); + INTERCEPTOR_LIST->hermes_flush_exclusion.erase(filename); + } + existing.first.st_bkid->Destroy(ctx); + mdm->FinalizeHermes(); + if (existing.first.a_mode & MPI_MODE_DELETE_ON_CLOSE) { + fs::remove(filename); + } + + } else { + LOG(INFO) << "File handler is opened by more than one open." + << std::endl; + existing.first.ref_count--; + mdm->Update(fh, existing.first); + existing.first.st_bkid->Release(ctx); + } + MPI_Barrier(existing.first.comm); + ret = MPI_SUCCESS; + } else { + MAP_OR_FAIL(MPI_File_close); + ret = real_MPI_File_close_(fh); + } + } else { + MAP_OR_FAIL(MPI_File_close); + ret = real_MPI_File_close_(fh); + } + return (ret); +} + +int HERMES_DECL(MPI_File_seek_shared)(MPI_File fh, MPI_Offset offset, + int whence) { + int ret; + if (IsTracked(&fh)) { + LOG(INFO) << "Intercept MPI_File_seek_shared offset:" << offset + << " whence:" << whence << "." << std::endl; + auto mdm = hermes::adapter::Singleton::GetInstance(); + auto existing = mdm->Find(&fh); + MPI_Offset sum_offset; + int sum_whence; + int comm_participators; + MPI_Comm_size(existing.first.comm, &comm_participators); + MPI_Allreduce(&offset, &sum_offset, 1, MPI_LONG_LONG_INT, MPI_SUM, + existing.first.comm); + MPI_Allreduce(&whence, &sum_whence, 1, MPI_INT, MPI_SUM, + existing.first.comm); + if (sum_offset / comm_participators != offset) { + LOG(ERROR) + << "Same offset should be passed across the opened file communicator." + << std::endl; + } + if (sum_whence / comm_participators != whence) { + LOG(ERROR) + << "Same whence should be passed across the opened file communicator." + << std::endl; + } + ret = MPI_File_seek(fh, offset, whence); + } else { + MAP_OR_FAIL(MPI_File_seek_shared); + ret = real_MPI_File_seek_shared_(fh, offset, whence); + } + return (ret); +} + +int HERMES_DECL(MPI_File_seek)(MPI_File fh, MPI_Offset offset, int whence) { + int ret = -1; + if (IsTracked(&fh)) { + auto mdm = hermes::adapter::Singleton::GetInstance(); + auto existing = mdm->Find(&fh); + if (existing.second) { + LOG(INFO) << "Intercept fseek offset:" << offset << " whence:" << whence + << "." << std::endl; + if (!(existing.first.a_mode & MPI_MODE_APPEND)) { + switch (whence) { + case MPI_SEEK_SET: { + existing.first.ptr = offset; + break; + } + case MPI_SEEK_CUR: { + existing.first.ptr += offset; + break; + } + case MPI_SEEK_END: { + existing.first.ptr = existing.first.size + offset; + break; + } + default: { + // TODO(hari): throw not implemented error. + } + } + mdm->Update(&fh, existing.first); + ret = 0; + } else { + LOG(INFO) + << "File pointer not updating as file was opened in append mode." + << std::endl; + ret = -1; + } + } else { + MAP_OR_FAIL(MPI_File_seek); + ret = real_MPI_File_seek_(fh, offset, whence); + } + } else { + MAP_OR_FAIL(MPI_File_seek); + ret = real_MPI_File_seek_(fh, offset, whence); + } + return (ret); +} +int HERMES_DECL(MPI_File_get_position)(MPI_File fh, MPI_Offset *offset) { + int ret = -1; + if (IsTracked(&fh)) { + auto mdm = hermes::adapter::Singleton::GetInstance(); + auto existing = mdm->Find(&fh); + *offset = existing.first.ptr; + ret = MPI_SUCCESS; + } else { + MAP_OR_FAIL(MPI_File_get_position); + ret = real_MPI_File_get_position_(fh, offset); + } + return ret; +} +/** + * Sync Read/Write + */ +int HERMES_DECL(MPI_File_read_all)(MPI_File fh, void *buf, int count, + MPI_Datatype datatype, MPI_Status *status) { + int ret; + if (IsTracked(&fh)) { + auto mdm = hermes::adapter::Singleton::GetInstance(); + auto existing = mdm->Find(&fh); + if (existing.second) { + MPI_Barrier(existing.first.comm); + LOG(INFO) << "Intercept MPI_File_read_all." << std::endl; + auto read_ret = + read_internal(existing, buf, count, datatype, &fh, status, true); + ret = read_ret.first; + MPI_Barrier(existing.first.comm); + } else { + MAP_OR_FAIL(MPI_File_read_all); + ret = real_MPI_File_read_all_(fh, buf, count, datatype, status); + } + } else { + MAP_OR_FAIL(MPI_File_read_all); + ret = real_MPI_File_read_all_(fh, buf, count, datatype, status); + } + return (ret); +} +int HERMES_DECL(MPI_File_read_at_all)(MPI_File fh, MPI_Offset offset, void *buf, + int count, MPI_Datatype datatype, + MPI_Status *status) { + int ret; + if (IsTracked(&fh)) { + ret = MPI_File_seek(fh, offset, MPI_SEEK_SET); + if (ret == MPI_SUCCESS) { + ret = MPI_File_read_all(fh, buf, count, datatype, status); + } + } else { + MAP_OR_FAIL(MPI_File_read_at_all); + ret = real_MPI_File_read_at_all_(fh, offset, buf, count, datatype, status); + } + return ret; +} +int HERMES_DECL(MPI_File_read_at)(MPI_File fh, MPI_Offset offset, void *buf, + int count, MPI_Datatype datatype, + MPI_Status *status) { + int ret; + if (IsTracked(&fh)) { + ret = MPI_File_seek(fh, offset, MPI_SEEK_SET); + if (ret == MPI_SUCCESS) { + ret = MPI_File_read(fh, buf, count, datatype, status); + } + } else { + MAP_OR_FAIL(MPI_File_read_at); + ret = real_MPI_File_read_at_(fh, offset, buf, count, datatype, status); + } + return ret; +} +int HERMES_DECL(MPI_File_read)(MPI_File fh, void *buf, int count, + MPI_Datatype datatype, MPI_Status *status) { + int ret; + if (IsTracked(&fh)) { + auto mdm = hermes::adapter::Singleton::GetInstance(); + auto existing = mdm->Find(&fh); + if (existing.second) { + LOG(INFO) << "Intercept MPI_File_read." << std::endl; + auto read_ret = + read_internal(existing, buf, count, datatype, &fh, status); + ret = read_ret.first; + } else { + MAP_OR_FAIL(MPI_File_read); + ret = real_MPI_File_read_(fh, buf, count, datatype, status); + } + } else { + MAP_OR_FAIL(MPI_File_read); + ret = real_MPI_File_read_(fh, buf, count, datatype, status); + } + return (ret); +} +int HERMES_DECL(MPI_File_read_ordered)(MPI_File fh, void *buf, int count, + MPI_Datatype datatype, + MPI_Status *status) { + int ret; + if (IsTracked(&fh)) { + auto mdm = hermes::adapter::Singleton::GetInstance(); + auto existing = mdm->Find(&fh); + int total; + MPI_Scan(&count, &total, 1, MPI_INT, MPI_SUM, existing.first.comm); + MPI_Offset my_offset = total - count; + ret = MPI_File_read_at_all(fh, my_offset, buf, count, datatype, status); + } else { + MAP_OR_FAIL(MPI_File_read_ordered); + ret = real_MPI_File_read_ordered_(fh, buf, count, datatype, status); + } + return (ret); +} +int HERMES_DECL(MPI_File_read_shared)(MPI_File fh, void *buf, int count, + MPI_Datatype datatype, + MPI_Status *status) { + int ret; + if (IsTracked(&fh)) { + ret = MPI_File_read(fh, buf, count, datatype, status); + } else { + MAP_OR_FAIL(MPI_File_read_shared); + ret = real_MPI_File_read_shared_(fh, buf, count, datatype, status); + } + return ret; +} +int HERMES_DECL(MPI_File_write_all)(MPI_File fh, const void *buf, int count, + MPI_Datatype datatype, MPI_Status *status) { + int ret; + if (IsTracked(&fh)) { + auto mdm = hermes::adapter::Singleton::GetInstance(); + auto existing = mdm->Find(&fh); + if (existing.second) { + MPI_Barrier(existing.first.comm); + LOG(INFO) << "Intercept MPI_File_write." << std::endl; + auto write_ret = + write_internal(existing, buf, count, datatype, &fh, status, true); + ret = write_ret.first; + MPI_Barrier(existing.first.comm); + } else { + MAP_OR_FAIL(MPI_File_write); + ret = real_MPI_File_write_(fh, buf, count, datatype, status); + } + } else { + MAP_OR_FAIL(MPI_File_write); + ret = real_MPI_File_write_(fh, buf, count, datatype, status); + } + return (ret); +} +int HERMES_DECL(MPI_File_write_at_all)(MPI_File fh, MPI_Offset offset, + const void *buf, int count, + MPI_Datatype datatype, + MPI_Status *status) { + int ret; + if (IsTracked(&fh)) { + ret = MPI_File_seek(fh, offset, MPI_SEEK_SET); + if (ret == MPI_SUCCESS) { + ret = MPI_File_write_all(fh, buf, count, datatype, status); + } + } else { + MAP_OR_FAIL(MPI_File_write_at_all); + ret = real_MPI_File_write_at_all_(fh, offset, buf, count, datatype, status); + } + return ret; +} +int HERMES_DECL(MPI_File_write_at)(MPI_File fh, MPI_Offset offset, + const void *buf, int count, + MPI_Datatype datatype, MPI_Status *status) { + int ret; + if (IsTracked(&fh)) { + ret = MPI_File_seek(fh, offset, MPI_SEEK_SET); + if (ret == MPI_SUCCESS) { + ret = MPI_File_write(fh, buf, count, datatype, status); + } + } else { + MAP_OR_FAIL(MPI_File_write_at); + ret = real_MPI_File_write_at_(fh, offset, buf, count, datatype, status); + } + return ret; +} +int HERMES_DECL(MPI_File_write)(MPI_File fh, const void *buf, int count, + MPI_Datatype datatype, MPI_Status *status) { + int ret; + if (IsTracked(&fh)) { + auto mdm = hermes::adapter::Singleton::GetInstance(); + auto existing = mdm->Find(&fh); + if (existing.second) { + LOG(INFO) << "Intercept MPI_File_write." << std::endl; + auto write_ret = + write_internal(existing, buf, count, datatype, &fh, status, false); + ret = write_ret.first; + } else { + MAP_OR_FAIL(MPI_File_write); + ret = real_MPI_File_write_(fh, buf, count, datatype, status); + } + } else { + MAP_OR_FAIL(MPI_File_write); + ret = real_MPI_File_write_(fh, buf, count, datatype, status); + } + return (ret); +} +int HERMES_DECL(MPI_File_write_ordered)(MPI_File fh, const void *buf, int count, + MPI_Datatype datatype, + MPI_Status *status) { + int ret; + if (IsTracked(&fh)) { + auto mdm = hermes::adapter::Singleton::GetInstance(); + auto existing = mdm->Find(&fh); + int total; + MPI_Scan(&count, &total, 1, MPI_INT, MPI_SUM, existing.first.comm); + MPI_Offset my_offset = total - count; + ret = MPI_File_write_at_all(fh, my_offset, buf, count, datatype, status); + } else { + MAP_OR_FAIL(MPI_File_write_ordered); + ret = real_MPI_File_write_ordered_(fh, buf, count, datatype, status); + } + return (ret); +} +int HERMES_DECL(MPI_File_write_shared)(MPI_File fh, const void *buf, int count, + MPI_Datatype datatype, + MPI_Status *status) { + int ret; + if (IsTracked(&fh)) { + ret = MPI_File_write_ordered(fh, buf, count, datatype, status); + } else { + MAP_OR_FAIL(MPI_File_write_shared); + ret = real_MPI_File_write_shared_(fh, buf, count, datatype, status); + } + return ret; +} +/** + * Async Read/Write + */ +int HERMES_DECL(MPI_File_iread_at)(MPI_File fh, MPI_Offset offset, void *buf, + int count, MPI_Datatype datatype, + MPI_Request *request) { + (void)fh; + (void)buf; + (void)count; + (void)datatype; + (void)request; + (void)offset; + return 0; +} +int HERMES_DECL(MPI_File_iread)(MPI_File fh, void *buf, int count, + MPI_Datatype datatype, MPI_Request *request) { + (void)fh; + (void)buf; + (void)count; + (void)datatype; + (void)request; + return 0; +} +int HERMES_DECL(MPI_File_iread_shared)(MPI_File fh, void *buf, int count, + MPI_Datatype datatype, + MPI_Request *request) { + (void)fh; + (void)buf; + (void)count; + (void)datatype; + (void)request; + return 0; +} +int HERMES_DECL(MPI_File_iwrite_at)(MPI_File fh, MPI_Offset offset, + const void *buf, int count, + MPI_Datatype datatype, + MPI_Request *request) { + (void)fh; + (void)buf; + (void)count; + (void)datatype; + (void)request; + (void)offset; + return 0; +} +int HERMES_DECL(MPI_File_iwrite)(MPI_File fh, const void *buf, int count, + MPI_Datatype datatype, MPI_Request *request) { + return 0; + (void)fh; + (void)buf; + (void)count; + (void)datatype; + (void)request; +} +int HERMES_DECL(MPI_File_iwrite_shared)(MPI_File fh, const void *buf, int count, + MPI_Datatype datatype, + MPI_Request *request) { + (void)fh; + (void)buf; + (void)count; + (void)datatype; + (void)request; + return 0; +} + +/** + * Other functions + */ +int HERMES_DECL(MPI_File_sync)(MPI_File fh) { + int ret = -1; + if (IsTracked(&fh)) { + auto mdm = hermes::adapter::Singleton::GetInstance(); + auto existing = mdm->Find(&fh); + if (existing.second) { + LOG(INFO) << "Intercept MPI_File_sync." << std::endl; + auto filename = existing.first.st_bkid->GetName(); + const auto &blob_names = existing.first.st_blobs; + if (!blob_names.empty() && INTERCEPTOR_LIST->Persists(filename)) { + INTERCEPTOR_LIST->hermes_flush_exclusion.insert(filename); + LOG(INFO) << "File handler is opened by adapter." << std::endl; + hapi::Context ctx; + LOG(INFO) << "Adapter flushes " << blob_names.size() + << " blobs to filename:" << filename << "." << std::endl; + hermes::api::VBucket file_vbucket(filename, mdm->GetHermes(), true, + ctx); + auto offset_map = std::unordered_map(); + for (const auto &blob_name : blob_names) { + file_vbucket.Link(blob_name, filename, ctx); + auto page_index = std::stol(blob_name) - 1; + offset_map.emplace(blob_name, page_index * kPageSize); + } + auto trait = hermes::api::FileMappingTrait(filename, offset_map, + nullptr, NULL, NULL); + file_vbucket.Attach(&trait, ctx); + file_vbucket.Destroy(ctx); + existing.first.st_blobs.clear(); + INTERCEPTOR_LIST->hermes_flush_exclusion.erase(filename); + } + ret = 0; + } + } else { + MAP_OR_FAIL(MPI_File_sync); + ret = real_MPI_File_sync_(fh); + } + return (ret); +} diff --git a/adapter/src/hermes/adapter/stdio/stdio.cc b/adapter/src/hermes/adapter/stdio/stdio.cc index 9c59a0b6d..0a2ec382e 100644 --- a/adapter/src/hermes/adapter/stdio/stdio.cc +++ b/adapter/src/hermes/adapter/stdio/stdio.cc @@ -19,7 +19,6 @@ #include using hermes::adapter::stdio::AdapterStat; -using hermes::adapter::stdio::FileID; using hermes::adapter::stdio::FileStruct; using hermes::adapter::stdio::MapperFactory; using hermes::adapter::stdio::MetadataManager; @@ -561,7 +560,7 @@ int HERMES_DECL(fflush)(FILE *fp) { auto trait = hermes::api::FileMappingTrait(filename, offset_map, nullptr, NULL, NULL); file_vbucket.Attach(&trait, ctx); - file_vbucket.Delete(ctx); + file_vbucket.Destroy(ctx); existing.first.st_blobs.clear(); INTERCEPTOR_LIST->hermes_flush_exclusion.erase(filename); } @@ -606,7 +605,7 @@ int HERMES_DECL(fclose)(FILE *fp) { auto trait = hermes::api::FileMappingTrait(filename, offset_map, nullptr, NULL, NULL); file_vbucket.Attach(&trait, ctx); - file_vbucket.Delete(ctx); + file_vbucket.Destroy(ctx); existing.first.st_blobs.clear(); INTERCEPTOR_LIST->hermes_flush_exclusion.erase(filename); } diff --git a/adapter/test/CMakeLists.txt b/adapter/test/CMakeLists.txt index f8600640d..e84f471d5 100644 --- a/adapter/test/CMakeLists.txt +++ b/adapter/test/CMakeLists.txt @@ -39,7 +39,8 @@ function(mpi_daemon test_exec test_mpi_proc test_args arg_name daemon_procs) ${CMAKE_SOURCE_DIR}/adapter/test/run_hermes.sh ${MPIEXEC_EXECUTABLE} ${CMAKE_BINARY_DIR}/bin/${test_exec} ${test_mpi_proc} ${CMAKE_BINARY_DIR}/bin/hermes_daemon ${daemon_procs} - ${CMAKE_SOURCE_DIR}/adapter/test/data/hermes.conf) + ${CMAKE_SOURCE_DIR}/adapter/test/data/hermes.conf + ${test_args}) set_property(TEST Test${test_exec}_${test_mpi_proc}_${arg_name} PROPERTY ENVIRONMENT HERMES_CONF=${CMAKE_SOURCE_DIR}/adapter/test/data/hermes.conf) set_property(TEST Test${test_exec}_${test_mpi_proc}_${arg_name} APPEND diff --git a/adapter/test/data/hermes.conf b/adapter/test/data/hermes.conf index 301ffe0e1..7a6bd0960 100644 --- a/adapter/test/data/hermes.conf +++ b/adapter/test/data/hermes.conf @@ -30,7 +30,7 @@ transfer_window_arena_percentage = 0.08; transient_arena_percentage = 0.03; max_buckets_per_node = 16; -max_vbuckets_per_node = 8; +max_vbuckets_per_node = 32; system_view_state_update_interval_ms = 1000; mount_points = {"", "./", "./", "./"}; diff --git a/adapter/test/mpiio/CMakeLists.txt b/adapter/test/mpiio/CMakeLists.txt index 314c4dd49..fad93fc77 100644 --- a/adapter/test/mpiio/CMakeLists.txt +++ b/adapter/test/mpiio/CMakeLists.txt @@ -5,4 +5,15 @@ #------------------------------------------------------------------------------ add_executable(mpiio_adapter_test mpiio_adapter_test.cpp ${ADAPTER_COMMON}) target_link_libraries(mpiio_adapter_test Catch2::Catch2 -lstdc++fs -lc MPI::MPI_CXX) -mpi(mpiio_adapter_test 2 "") \ No newline at end of file +mpi(mpiio_adapter_test 2 "") + +add_executable(hermes_mpiio_adapter_test mpiio_adapter_test.cpp ${ADAPTER_COMMON}) +target_link_libraries(hermes_mpiio_adapter_test Catch2::Catch2 -lstdc++fs -lc MPI::MPI_CXX) +target_link_libraries(hermes_mpiio_adapter_test hermes_mpiio) +add_dependencies(hermes_mpiio_adapter_test hermes_mpiio) +add_dependencies(hermes_mpiio_adapter_test hermes_daemon) +set_target_properties(hermes_mpiio_adapter_test PROPERTIES COMPILE_FLAGS "-DHERMES_INTERCEPT=1") + +mpi_daemon(hermes_mpiio_adapter_test 2 "[synchronicity=sync][operation=single_read]" "syncread" 1) +mpi_daemon(hermes_mpiio_adapter_test 2 "[synchronicity=sync][operation=single_write]" "syncwrite" 1) +mpi_daemon(hermes_mpiio_adapter_test 2 "[operation=single_open]" "open" 1) \ No newline at end of file diff --git a/adapter/test/mpiio/mpiio_adapter_basic_test.cpp b/adapter/test/mpiio/mpiio_adapter_basic_test.cpp index 41bf2cd37..b9a962247 100644 --- a/adapter/test/mpiio/mpiio_adapter_basic_test.cpp +++ b/adapter/test/mpiio/mpiio_adapter_basic_test.cpp @@ -172,12 +172,15 @@ TEST_CASE("OpenCollective", "[process=" + std::to_string(info.comm_size) + REQUIRE(test::status_orig == MPI_SUCCESS); } - SECTION("delete on close mode on shared file") { + SECTION("delete on close mode on new file") { test::test_open( - info.new_file.c_str(), + info.shared_new_file.c_str(), MPI_MODE_WRONLY | MPI_MODE_CREATE | MPI_MODE_DELETE_ON_CLOSE, MPI_COMM_WORLD); - REQUIRE(test::status_orig != MPI_SUCCESS); + REQUIRE(test::status_orig == MPI_SUCCESS); + test::test_close(); + REQUIRE(test::status_orig == MPI_SUCCESS); + REQUIRE(!fs::exists(info.shared_new_file.c_str())); } posttest(); } @@ -304,7 +307,6 @@ TEST_CASE("SingleWrite", "[process=" + std::to_string(info.comm_size) + test::test_write(info.write_data.c_str(), args.request_size, MPI_CHAR); REQUIRE((size_t)test::size_written_orig == args.request_size); REQUIRE(fs::exists(info.new_file.c_str())); - REQUIRE(fs::file_size(info.new_file) == (size_t)test::size_written_orig); test::test_close(); REQUIRE(!fs::exists(info.new_file.c_str())); REQUIRE(test::status_orig == MPI_SUCCESS); @@ -483,8 +485,6 @@ TEST_CASE("SingleWriteCollective", REQUIRE((size_t)test::size_written_orig == args.request_size); REQUIRE(fs::exists(info.shared_new_file.c_str())); MPI_Barrier(MPI_COMM_WORLD); - REQUIRE(fs::file_size(info.shared_new_file) == - (size_t)test::size_written_orig * info.comm_size); test::test_close(); REQUIRE(!fs::exists(info.shared_new_file.c_str())); REQUIRE(test::status_orig == MPI_SUCCESS); @@ -926,21 +926,6 @@ TEST_CASE("SingleReadCollective", "[process=" + std::to_string(info.comm_size) + REQUIRE(test::status_orig == MPI_SUCCESS); } - SECTION("read at the end of existing file") { - test::test_open(info.shared_existing_file.c_str(), MPI_MODE_RDONLY, - MPI_COMM_WORLD); - REQUIRE(test::status_orig == MPI_SUCCESS); - test::test_seek(0, MPI_SEEK_END); - REQUIRE(test::status_orig == MPI_SUCCESS); - MPI_Offset offset; - MPI_File_get_position(test::fh_orig, &offset); - REQUIRE(offset == (long long)(args.request_size * info.num_iterations)); - test::test_read_all(info.read_data.data(), args.request_size, MPI_CHAR); - REQUIRE((size_t)test::size_read_orig == args.request_size); - test::test_close(); - REQUIRE(test::status_orig == MPI_SUCCESS); - } - SECTION("read_at_all from existing file") { test::test_open(info.existing_file.c_str(), MPI_MODE_RDONLY, MPI_COMM_SELF); REQUIRE(test::status_orig == MPI_SUCCESS); @@ -1066,21 +1051,6 @@ TEST_CASE("SingleAsyncReadCollective", REQUIRE(test::status_orig == MPI_SUCCESS); } - SECTION("read at the end of existing file") { - test::test_open(info.shared_existing_file.c_str(), MPI_MODE_RDONLY, - MPI_COMM_WORLD); - REQUIRE(test::status_orig == MPI_SUCCESS); - test::test_seek(0, MPI_SEEK_END); - REQUIRE(test::status_orig == MPI_SUCCESS); - MPI_Offset offset; - MPI_File_get_position(test::fh_orig, &offset); - REQUIRE(offset == (long long)(args.request_size * info.num_iterations)); - test::test_iread_all(info.read_data.data(), args.request_size, MPI_CHAR); - REQUIRE((size_t)test::size_read_orig == args.request_size); - test::test_close(); - REQUIRE(test::status_orig == MPI_SUCCESS); - } - SECTION("read_at_all from existing file") { test::test_open(info.existing_file.c_str(), MPI_MODE_RDONLY, MPI_COMM_SELF); REQUIRE(test::status_orig == MPI_SUCCESS); diff --git a/adapter/test/mpiio/mpiio_adapter_test.cpp b/adapter/test/mpiio/mpiio_adapter_test.cpp index 5e49a56b8..ce21529fa 100644 --- a/adapter/test/mpiio/mpiio_adapter_test.cpp +++ b/adapter/test/mpiio/mpiio_adapter_test.cpp @@ -30,6 +30,7 @@ struct Arguments { size_t request_size = 65536; }; struct Info { + bool debug = false; int rank = 0; int comm_size = 1; std::string write_data; @@ -78,6 +79,12 @@ int init(int* argc, char*** argv) { info.read_data = std::string(args.request_size, 'r'); MPI_Comm_rank(MPI_COMM_WORLD, &info.rank); MPI_Comm_size(MPI_COMM_WORLD, &info.comm_size); + if (info.debug && info.rank == 0) { + printf("ready for attach\n"); + fflush(stdout); + sleep(30); + } + MPI_Barrier(MPI_COMM_WORLD); return 0; } int finalize() { diff --git a/adapter/test/stdio/CMakeLists.txt b/adapter/test/stdio/CMakeLists.txt index 5e9fc1f95..ca7375920 100644 --- a/adapter/test/stdio/CMakeLists.txt +++ b/adapter/test/stdio/CMakeLists.txt @@ -1,5 +1,5 @@ function(gcc_hermes exec tag_name tags conf) - add_test(NAME Test${exec}_${tag_name} COMMAND "${CMAKE_BINARY_DIR}/bin/${exec}" ${tags} -s --reporter compact) + add_test(NAME Test${exec}_${tag_name} COMMAND "${CMAKE_BINARY_DIR}/bin/${exec}" ${tags} --reporter compact) set_property(TEST Test${exec}_${tag_name} PROPERTY ENVIRONMENT HERMES_CONF=${CMAKE_SOURCE_DIR}/adapter/test/data/${conf}.conf) set_property(TEST Test${exec}_${tag_name} APPEND @@ -7,7 +7,7 @@ function(gcc_hermes exec tag_name tags conf) endfunction() function(gcc_hermes_mode exec tag_name tags mode path) set(test_name Test${exec}_${tag_name}_${mode}_${path}) - add_test(NAME ${test_name} COMMAND "${CMAKE_BINARY_DIR}/bin/${exec}" ${tags} -s --reporter compact) + add_test(NAME ${test_name} COMMAND "${CMAKE_BINARY_DIR}/bin/${exec}" ${tags} --reporter compact) set_property(TEST ${test_name} PROPERTY ENVIRONMENT HERMES_CONF=${CMAKE_SOURCE_DIR}/adapter/test/data/hermes.conf) set_property(TEST ${test_name} APPEND diff --git a/src/api/vbucket.cc b/src/api/vbucket.cc index ca6161533..ab1eee1f5 100644 --- a/src/api/vbucket.cc +++ b/src/api/vbucket.cc @@ -138,14 +138,19 @@ Blob& VBucket::GetBlob(std::string blob_name, std::string bucket_name) { return local_blob; } -std::vector VBucket::GetLinks(Context& ctx) { +std::vector VBucket::GetLinks(Context& ctx) { (void)ctx; LOG(INFO) << "Getting subset of links satisfying pred in VBucket " << name_ << '\n'; auto blob_ids = GetBlobsFromVBucketInfo(&hermes_->context_, &hermes_->rpc_, id_); + auto blob_names = std::vector(); + for (const auto& blob_id : blob_ids) { + blob_names.push_back( + GetBlobNameFromId(&hermes_->context_, &hermes_->rpc_, blob_id)); + } // TODO(hari): add filtering - return blob_ids; + return blob_names; } Status VBucket::Attach(Trait* trait) { @@ -259,12 +264,12 @@ std::vector VBucket::GetTraits(Predicate pred, Context& ctx) { } Status VBucket::Delete() { - Status result = Delete(ctx_); + Status result = Destroy(ctx_); return result; } -Status VBucket::Delete(Context& ctx) { +Status VBucket::Destroy(Context& ctx) { (void)ctx; Status ret; @@ -350,6 +355,7 @@ Status VBucket::Delete(Context& ctx) { } } attached_traits_.clear(); + DestroyVBucket(&hermes_->context_, &hermes_->rpc_, this->name_.c_str(), id_); return Status(); } diff --git a/src/api/vbucket.h b/src/api/vbucket.h index 8af6ff305..24dade710 100644 --- a/src/api/vbucket.h +++ b/src/api/vbucket.h @@ -87,7 +87,7 @@ class VBucket { /** retrieves the subset of blob links satisfying pred */ /** could return iterator */ - std::vector GetLinks(Context &ctx); + std::vector GetLinks(Context &ctx); /** attach a trait to this vbucket */ Status Attach(Trait *trait, Context &ctx); @@ -103,7 +103,7 @@ class VBucket { /** delete a vBucket */ /** decrements the links counts of blobs in buckets */ - Status Delete(Context &ctx); + Status Destroy(Context &ctx); Status Delete(); }; // class VBucket diff --git a/src/metadata_management.cc b/src/metadata_management.cc index 58f3754a8..4dd52f58b 100644 --- a/src/metadata_management.cc +++ b/src/metadata_management.cc @@ -782,6 +782,20 @@ bool DestroyBucket(SharedMemoryContext *context, RpcContext *rpc, return destroyed; } +bool DestroyVBucket(SharedMemoryContext *context, RpcContext *rpc, + const char *name, VBucketID vbucket_id) { + u32 target_node = vbucket_id.bits.node_id; + bool destroyed = false; + if (target_node == rpc->node_id) { + destroyed = LocalDestroyVBucket(context, name, vbucket_id); + } else { + destroyed = RpcCall(rpc, target_node, "RemoteDestroyVBucket", + std::string(name), vbucket_id); + } + + return destroyed; +} + void LocalRenameBucket(SharedMemoryContext *context, RpcContext *rpc, BucketID id, const std::string &old_name, const std::string &new_name) { diff --git a/src/metadata_management.h b/src/metadata_management.h index 5e77cd0e9..fb8842d85 100644 --- a/src/metadata_management.h +++ b/src/metadata_management.h @@ -153,6 +153,12 @@ void InitNeighborhoodTargets(SharedMemoryContext *context, RpcContext *rpc); bool DestroyBucket(SharedMemoryContext *context, RpcContext *rpc, const char *name, BucketID bucket_id); +/** + * + */ +bool DestroyVBucket(SharedMemoryContext *context, RpcContext *rpc, + const char *name, VBucketID vbucket_id); + /** * */ diff --git a/src/metadata_management_internal.h b/src/metadata_management_internal.h index 48c9bb213..737c584fb 100644 --- a/src/metadata_management_internal.h +++ b/src/metadata_management_internal.h @@ -47,6 +47,8 @@ void LocalGetBufferIdList(Arena *arena, MetadataManager *mdm, BlobID blob_id, void LocalFreeBufferIdList(SharedMemoryContext *context, BlobID blob_id); bool LocalDestroyBucket(SharedMemoryContext *context, RpcContext *rpc, const char *bucket_name, BucketID bucket_id); +bool LocalDestroyVBucket(SharedMemoryContext *context, const char *vbucket_name, + VBucketID vbucket_id); void LocalDestroyBlobById(SharedMemoryContext *context, RpcContext *rpc, BlobID blob_id, BucketID bucket_id); void LocalDestroyBlobByName(SharedMemoryContext *context, RpcContext *rpc, diff --git a/src/metadata_storage_stb_ds.cc b/src/metadata_storage_stb_ds.cc index 90e8ee182..0e56614f4 100644 --- a/src/metadata_storage_stb_ds.cc +++ b/src/metadata_storage_stb_ds.cc @@ -432,8 +432,20 @@ bool LocalContainsBlob(SharedMemoryContext *context, BucketID bucket_id, return result; } +static inline bool HasAllocated(ChunkedIdList *list) { + bool result = list->capacity > 0; + + return result; +} + +static inline bool HasAllocatedBlobs(VBucketInfo *info) { + bool result = HasAllocated(&info->blobs); + + return result; +} + static inline bool HasAllocatedBlobs(BucketInfo *info) { - bool result = info->blobs.capacity > 0; + bool result = HasAllocated(&info->blobs); return result; } @@ -493,6 +505,45 @@ bool LocalDestroyBucket(SharedMemoryContext *context, RpcContext *rpc, return destroyed; } +bool LocalDestroyVBucket(SharedMemoryContext *context, const char *vbucket_name, + VBucketID vbucket_id) { + bool destroyed = false; + MetadataManager *mdm = GetMetadataManagerFromContext(context); + BeginTicketMutex(&mdm->vbucket_mutex); + VBucketInfo *info = LocalGetVBucketInfoById(mdm, vbucket_id); + + // TODO(chogan): @optimization Lock granularity can probably be relaxed if + // this is slow + int ref_count = info->ref_count.load(); + if (ref_count == 1) { + if (HasAllocatedBlobs(info)) { + FreeIdList(mdm, info->blobs); + } + + info->blobs.length = 0; + info->blobs.capacity = 0; + info->blobs.head_offset = 0; + + // Reset VBucketInfo to initial values + info->ref_count.store(0); + info->active = false; + info->stats = {}; + + mdm->num_vbuckets--; + info->next_free = mdm->first_free_vbucket; + mdm->first_free_vbucket = vbucket_id; + + // Remove (name -> vbucket_id) map entry + LocalDelete(mdm, vbucket_name, kMapType_VBucket); + destroyed = true; + } else { + LOG(INFO) << "Cannot destroy bucket " << vbucket_name + << ". It's refcount is " << ref_count << std::endl; + } + EndTicketMutex(&mdm->vbucket_mutex); + return destroyed; +} + std::vector LocalGetTargets(MetadataManager *mdm, IdList target_list) { u32 length = target_list.length; diff --git a/src/rpc_thallium.cc b/src/rpc_thallium.cc index 900bb139f..54ec5ef36 100644 --- a/src/rpc_thallium.cc +++ b/src/rpc_thallium.cc @@ -220,6 +220,12 @@ void ThalliumStartRpcServer(SharedMemoryContext *context, RpcContext *rpc, req.respond(result); }; + auto rpc_destroy_vbucket = + [context](const request &req, const string &name, VBucketID id) { + bool result = LocalDestroyVBucket(context, name.c_str(), id); + req.respond(result); + }; + auto rpc_get_or_create_bucket_id = [context](const request &req, const std::string &name) { BucketID result = LocalGetOrCreateBucketId(context, name); @@ -372,6 +378,7 @@ void ThalliumStartRpcServer(SharedMemoryContext *context, RpcContext *rpc, rpc_server->define("RemoteAddBlobIdToBucket", rpc_add_blob_bucket); rpc_server->define("RemoteAddBlobIdToVBucket", rpc_add_blob_vbucket); rpc_server->define("RemoteDestroyBucket", rpc_destroy_bucket); + rpc_server->define("RemoteDestroyVBucket", rpc_destroy_vbucket); rpc_server->define("RemoteRenameBucket", rpc_rename_bucket); rpc_server->define("RemoteDestroyBlobByName", rpc_destroy_blob_by_name); rpc_server->define("RemoteDestroyBlobById", rpc_destroy_blob_by_id); diff --git a/test/data/asan.supp b/test/data/asan.supp index e9420bf30..77f836cd6 100644 --- a/test/data/asan.supp +++ b/test/data/asan.supp @@ -4,4 +4,5 @@ leak:libabt.so leak:libmargo.so leak:libmpi.so leak:librdmacm.so -leak:libhwloc.so \ No newline at end of file +leak:libhwloc.so +leak:libmpich.so \ No newline at end of file diff --git a/test/vbucket_test.cc b/test/vbucket_test.cc index 4d631f6d7..17ff4476b 100644 --- a/test/vbucket_test.cc +++ b/test/vbucket_test.cc @@ -90,7 +90,7 @@ int main(int argc, char **argv) { } } if (app_rank == 0) { - shared.Delete(ctx); + shared.Destroy(ctx); } hermes->AppBarrier(); } else {