Skip to content

Commit

Permalink
Merge pull request #110 from hariharan-devarajan/hariharan/stdio_mpi
Browse files Browse the repository at this point in the history
Hariharan/stdio mpi
  • Loading branch information
ChristopherHogan committed Feb 8, 2021
2 parents a18d960 + f9330be commit 1e39d63
Show file tree
Hide file tree
Showing 31 changed files with 1,571 additions and 998 deletions.
6 changes: 6 additions & 0 deletions adapter/include/hermes/adapter/stdio.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,10 @@ HERMES_FORWARD_DECL(fsetpos, int, (FILE * stream, const fpos_t *pos));
HERMES_FORWARD_DECL(fsetpos64, int, (FILE * stream, const fpos64_t *pos));
HERMES_FORWARD_DECL(rewind, void, (FILE * stream));

/**
* MPI functions declarations
*/
HERMES_FORWARD_DECL(MPI_Init, int, (int *argc, char ***argv));
HERMES_FORWARD_DECL(MPI_Finalize, int, (void));

#endif // HERMES_ADAPTER_STDIO_H
1 change: 0 additions & 1 deletion adapter/src/hermes/adapter/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,4 @@ const char* kHermesConf = "HERMES_CONF";
* from I/O. This is needed as we should not intercept these files.
*/
const char* kHermesExtension = ".hermes";

#endif // HERMES_ADAPTER_CONSTANTS_H
1 change: 1 addition & 0 deletions adapter/src/hermes/adapter/interceptor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ bool IsTracked(FILE* fh) {
int fno = fileno(fh);
snprintf(proclnk, kMaxSize, "/proc/self/fd/%d", fno);
size_t r = readlink(proclnk, filename, kMaxSize);
filename[r] = '\0';
if (r > 0) {
std::string file_str(filename);
return IsTracked(file_str);
Expand Down
9 changes: 8 additions & 1 deletion adapter/src/hermes/adapter/stdio/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,14 @@ set(STDIO_ADAPTER_PRIVATE_HEADER ${CMAKE_CURRENT_SOURCE_DIR}/metadata_manager.h
${CMAKE_CURRENT_SOURCE_DIR}/common/enumerations.h
${CMAKE_CURRENT_SOURCE_DIR}/common/constants.h)

set(STDIO_INTERNAL_ADAPTER_SRC ${CMAKE_CURRENT_SOURCE_DIR}/metadata_manager.cc
${CMAKE_CURRENT_SOURCE_DIR}/mapper/balanced_mapper.cc)

# Add library hermes_stdio
add_library(hermes_stdio SHARED ${STDIO_ADAPTER_PRIVATE_HEADER} ${STDIO_ADAPTER_PUBLIC_HEADER} ${STDIO_ADAPTER_SRC})
add_dependencies(hermes_stdio hermes)
target_link_libraries(hermes_stdio hermes MPI::MPI_CXX)
target_link_libraries(hermes_stdio hermes MPI::MPI_CXX)

add_library(hermes_stdio_internal SHARED ${STDIO_ADAPTER_PRIVATE_HEADER} ${STDIO_ADAPTER_PUBLIC_HEADER} ${STDIO_INTERNAL_ADAPTER_SRC})
add_dependencies(hermes_stdio_internal hermes)
target_link_libraries(hermes_stdio_internal hermes MPI::MPI_CXX)
5 changes: 4 additions & 1 deletion adapter/src/hermes/adapter/stdio/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
/**
* Internal header
*/
#include <hermes/adapter/constants.h>
#include <hermes/adapter/stdio/common/enumerations.h>

/**
Expand All @@ -40,5 +39,9 @@ const MapperType kMapperType = MapperType::BALANCED;
* Define kPageSize for balanced mapping.
*/
const size_t kPageSize = 1024 * 1024;
/**
* String delimiter
*/
const char kStringDelimiter = '#';

#endif // HERMES_STDIO_COMMON_CONSTANTS_H
5 changes: 4 additions & 1 deletion adapter/src/hermes/adapter/stdio/common/datastructures.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,10 @@ struct AdapterStat {
* Comparator for comparing two blobs.
*/
static bool CompareBlobs(const std::string &a, const std::string &b) {
return std::stol(a) < std::stol(b);
/* FIXME(hari): change this once we have blob namespace separated per
* bucket.*/
size_t pos = a.find(kStringDelimiter) + 1;
return std::stol(a.substr(pos)) < std::stol(b.substr(pos));
}
};

Expand Down
7 changes: 6 additions & 1 deletion adapter/src/hermes/adapter/stdio/mapper/balanced_mapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ MapperReturnType BalancedMapper::map(const FileStruct& file_op) {

auto mapper_return = MapperReturnType();
size_t size_mapped = 0;
std::hash<FileID> file_hash_t;
size_t file_hash = file_hash_t(file_op.file_id_);
while (file_op.size_ > size_mapped) {
FileStruct file;
file.file_id_ = file_op.file_id_;
Expand All @@ -38,7 +40,10 @@ MapperReturnType BalancedMapper::map(const FileStruct& file_op) {
: file_op.size_ - size_mapped;

file.size_ = hermes.size_;
hermes.blob_name_ = std::to_string(page_index);
/* FIXME(hari): change this once we have blob namespace separated per
* bucket.*/
hermes.blob_name_ = std::to_string(file_hash) + kStringDelimiter +
std::to_string(page_index);
mapper_return.emplace_back(file, hermes);
size_mapped += hermes.size_;
}
Expand Down
37 changes: 33 additions & 4 deletions adapter/src/hermes/adapter/stdio/metadata_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
/**
* Internal headers
*/
#include <hermes/adapter/constants.h>
#include <hermes/adapter/stdio/common/constants.h>
#include <hermes/adapter/stdio/common/datastructures.h>
#include <mpi.h>

namespace hermes::adapter::stdio {
/**
Expand All @@ -48,12 +50,19 @@ class MetadataManager {
* references of how many times hermes was tried to initialize.
*/
std::atomic<size_t> ref;
/**
* MPI attributes
*/
bool is_mpi;
int rank;
int comm_size;

public:
/**
* Constructor
*/
MetadataManager() : metadata(), ref(0) {}
MetadataManager()
: metadata(), ref(0), is_mpi(false), rank(0), comm_size(1) {}
/**
* Get the instance of hermes.
*/
Expand All @@ -65,10 +74,22 @@ class MetadataManager {
* daemon mode. Keep a reference of how many times Initialize is called.
* Within the adapter, Initialize is called from fopen.
*/
void InitializeHermes() {
void InitializeHermes(bool is_mpi = false) {
if (ref == 0) {
this->is_mpi = is_mpi;
char* hermes_config = getenv(kHermesConf);
hermes = hapi::InitHermes(hermes_config, true);
if (this->is_mpi) {
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &comm_size);
if (comm_size > 1) {
hermes = hermes::InitHermesClient(hermes_config);
} else {
this->is_mpi = false;
hermes = hermes::InitHermesDaemon(hermes_config);
}
} else {
hermes = hermes::InitHermesDaemon(hermes_config);
}
}
ref++;
}
Expand All @@ -78,7 +99,15 @@ class MetadataManager {
*/
void FinalizeHermes() {
if (ref == 1) {
hermes->Finalize(true);
if (this->is_mpi) {
MPI_Barrier(MPI_COMM_WORLD);
if (this->rank == 0) {
hermes->RemoteFinalize();
}
hermes->Finalize();
} else {
hermes->Finalize(true);
}
}
ref--;
}
Expand Down
Loading

0 comments on commit 1e39d63

Please sign in to comment.