Skip to content

Commit

Permalink
Merge 7d2c75d into 6211991
Browse files Browse the repository at this point in the history
  • Loading branch information
ChristopherHogan committed Jun 17, 2021
2 parents 6211991 + 7d2c75d commit 9f92117
Show file tree
Hide file tree
Showing 12 changed files with 307 additions and 90 deletions.
2 changes: 2 additions & 0 deletions adapter/include/hermes/adapter/mpiio.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,5 +119,7 @@ HERMES_FORWARD_DECL(MPI_File_get_position, int,
*/
HERMES_FORWARD_DECL(MPI_Init, int, (int *argc, char ***argv));
HERMES_FORWARD_DECL(MPI_Finalize, int, (void));
HERMES_FORWARD_DECL(MPI_Wait, int, (MPI_Request*, MPI_Status*));
HERMES_FORWARD_DECL(MPI_Waitall, int, (int, MPI_Request*, MPI_Status*))

#endif // HERMES_MPIIO_H
4 changes: 4 additions & 0 deletions adapter/src/hermes/adapter/mpiio/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,8 @@ const size_t kPageSize = 1024 * 1024;
*/
const char kStringDelimiter = '#';

/**
* Number of threads for thread pool of async I/O APIs
*/
const int kNumThreads = 1;
#endif // HERMES_MPIIO_COMMON_CONSTANTS_H
8 changes: 8 additions & 0 deletions adapter/src/hermes/adapter/mpiio/common/datastructures.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
#include <hermes_types.h>
#include <mpi.h>

#include <future>

/**
* Namespace simplification.
*/
Expand Down Expand Up @@ -161,5 +163,11 @@ struct AdapterStat {
}
};


struct HermesRequest {
std::future<int> return_future;
MPI_Status status;
};

} // namespace hermes::adapter::mpiio
#endif // HERMES_MPIIO_ADAPTER_DATASTRUCTURES_H
8 changes: 6 additions & 2 deletions adapter/src/hermes/adapter/mpiio/metadata_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,15 @@ class MetadataManager {
*/
int rank;
int comm_size;

/**
* Maintain a local metadata MPI_Request and HermesRequest.
*/
std::unordered_map<MPI_Request*, HermesRequest*> request_map;
/**
* Constructor
*/
MetadataManager() : metadata(), ref(0), rank(0), comm_size(1) {}
MetadataManager()
: metadata(), ref(0), rank(0), comm_size(1), request_map() {}
/**
* Get the instance of hermes.
*/
Expand Down
229 changes: 173 additions & 56 deletions adapter/src/hermes/adapter/mpiio/mpiio.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,20 @@
* have access to the file, you may request a copy from help@hdfgroup.org. *
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */

/**
* Internal headers
*/
#include <hermes/adapter/mpiio.h>
#include <hermes/adapter/thread_pool.h>

#include <hermes/adapter/mpiio/mapper/balanced_mapper.cc>

#include "mpi.h"
/**
* Namespace declarations
*/
using hermes::adapter::ThreadPool;
using hermes::adapter::mpiio::AdapterStat;
using hermes::adapter::mpiio::FileStruct;
using hermes::adapter::mpiio::HermesRequest;
using hermes::adapter::mpiio::MapperFactory;
using hermes::adapter::mpiio::MetadataManager;

Expand Down Expand Up @@ -99,7 +103,7 @@ size_t perform_file_read(const char *filename, size_t file_offset, void *ptr,
MPI_INFO_NULL, &fh);
int read_size = 0;
if (status == MPI_SUCCESS) {
int status = MPI_File_seek(fh, file_offset, MPI_SEEK_SET);
status = MPI_File_seek(fh, file_offset, MPI_SEEK_SET);
if (status == MPI_SUCCESS) {
MPI_Status read_status;
status = MPI_File_read(fh, (char *)ptr + ptr_offset, count, datatype,
Expand Down Expand Up @@ -315,6 +319,7 @@ std::pair<int, size_t> write_internal(std::pair<AdapterStat, bool> &existing,
}
}
}
vbucket.Release();
}
}
data_offset += item.first.size_;
Expand Down Expand Up @@ -468,8 +473,8 @@ std::pair<int, size_t> read_internal(std::pair<AdapterStat, bool> &existing,
perform_file_read(filename.c_str(), item.first.offset_, ptr,
total_read_size, item.first.size_, MPI_CHAR);
}
vbucket.Release();
}

if (read_size > 0) {
total_read_size += read_size;
}
Expand All @@ -490,6 +495,8 @@ int HERMES_DECL(MPI_Init)(int *argc, char ***argv) {
LOG(INFO) << "MPI Init intercepted." << std::endl;
auto mdm = hermes::adapter::Singleton<MetadataManager>::GetInstance();
mdm->InitializeHermes();
auto pool =
hermes::adapter::Singleton<ThreadPool>::GetInstance(kNumThreads);
}
return status;
}
Expand All @@ -502,6 +509,34 @@ int HERMES_DECL(MPI_Finalize)(void) {
int status = real_MPI_Finalize_();
return status;
}

int HERMES_DECL(MPI_Wait)(MPI_Request *req, MPI_Status *status) {
int ret;
auto mdm = hermes::adapter::Singleton<MetadataManager>::GetInstance();
auto iter = mdm->request_map.find(req);
if (iter != mdm->request_map.end()) {
ret = iter->second->return_future.get();
memcpy(status, &iter->second->status, sizeof(MPI_Status));
auto h_req = iter->second;
mdm->request_map.erase(iter);
delete (h_req);
} else {
MAP_OR_FAIL(MPI_Wait);
ret = real_MPI_Wait_(req, status);
}
return ret;
}

int HERMES_DECL(MPI_Waitall)(int count, MPI_Request *req, MPI_Status *status) {
int ret;
for (int i = 0; i < count; i++) {
auto sub_ret = MPI_Wait(&req[i], &status[i]);
if (sub_ret != MPI_SUCCESS) {
ret = sub_ret;
}
}
return ret;
}
/**
* Metadata functions
*/
Expand Down Expand Up @@ -813,12 +848,12 @@ int HERMES_DECL(MPI_File_write_all)(MPI_File fh, const void *buf, int count,
ret = write_ret.first;
MPI_Barrier(existing.first.comm);
} else {
MAP_OR_FAIL(MPI_File_write);
ret = real_MPI_File_write_(fh, buf, count, datatype, status);
MAP_OR_FAIL(MPI_File_write_all);
ret = real_MPI_File_write_all_(fh, buf, count, datatype, status);
}
} else {
MAP_OR_FAIL(MPI_File_write);
ret = real_MPI_File_write_(fh, buf, count, datatype, status);
MAP_OR_FAIL(MPI_File_write_all);
ret = real_MPI_File_write_all_(fh, buf, count, datatype, status);
}
return (ret);
}
Expand Down Expand Up @@ -909,63 +944,122 @@ int HERMES_DECL(MPI_File_write_shared)(MPI_File fh, const void *buf, int count,
int HERMES_DECL(MPI_File_iread_at)(MPI_File fh, MPI_Offset offset, void *buf,
int count, MPI_Datatype datatype,
MPI_Request *request) {
(void)fh;
(void)buf;
(void)count;
(void)datatype;
(void)request;
(void)offset;
return 0;
int ret;
if (IsTracked(&fh)) {
auto pool =
hermes::adapter::Singleton<ThreadPool>::GetInstance(kNumThreads);
HermesRequest *req = new HermesRequest();
auto func = std::bind(MPI_File_read_at, fh, offset, buf, count, datatype,
&req->status);
req->return_future = pool->run(func);
auto mdm = hermes::adapter::Singleton<MetadataManager>::GetInstance();
mdm->request_map.emplace(request, req);
ret = MPI_SUCCESS;
} else {
MAP_OR_FAIL(MPI_File_iread_at);
ret = real_MPI_File_iread_at_(fh, offset, buf, count, datatype, request);
}
return ret;
}
int HERMES_DECL(MPI_File_iread)(MPI_File fh, void *buf, int count,
MPI_Datatype datatype, MPI_Request *request) {
(void)fh;
(void)buf;
(void)count;
(void)datatype;
(void)request;
return 0;
int ret;
if (IsTracked(&fh)) {
auto pool =
hermes::adapter::Singleton<ThreadPool>::GetInstance(kNumThreads);
HermesRequest *req = new HermesRequest();
auto func =
std::bind(MPI_File_read, fh, buf, count, datatype, &req->status);
req->return_future = pool->run(func);
auto mdm = hermes::adapter::Singleton<MetadataManager>::GetInstance();
mdm->request_map.emplace(request, req);
ret = MPI_SUCCESS;
} else {
MAP_OR_FAIL(MPI_File_iread);
ret = real_MPI_File_iread_(fh, buf, count, datatype, request);
}
return ret;
}
int HERMES_DECL(MPI_File_iread_shared)(MPI_File fh, void *buf, int count,
MPI_Datatype datatype,
MPI_Request *request) {
(void)fh;
(void)buf;
(void)count;
(void)datatype;
(void)request;
return 0;
int ret;
if (IsTracked(&fh)) {
auto pool =
hermes::adapter::Singleton<ThreadPool>::GetInstance(kNumThreads);
HermesRequest *req = new HermesRequest();
auto func =
std::bind(MPI_File_read_shared, fh, buf, count, datatype, &req->status);
req->return_future = pool->run(func);
auto mdm = hermes::adapter::Singleton<MetadataManager>::GetInstance();
mdm->request_map.emplace(request, req);
ret = MPI_SUCCESS;
} else {
MAP_OR_FAIL(MPI_File_iread_shared);
ret = real_MPI_File_iread_shared_(fh, buf, count, datatype, request);
}
return ret;
}
int HERMES_DECL(MPI_File_iwrite_at)(MPI_File fh, MPI_Offset offset,
const void *buf, int count,
MPI_Datatype datatype,
MPI_Request *request) {
(void)fh;
(void)buf;
(void)count;
(void)datatype;
(void)request;
(void)offset;
return 0;
int ret;
if (IsTracked(&fh)) {
auto pool =
hermes::adapter::Singleton<ThreadPool>::GetInstance(kNumThreads);
HermesRequest *req = new HermesRequest();
auto func = std::bind(MPI_File_write_at, fh, offset, buf, count, datatype,
&req->status);
req->return_future = pool->run(func);
auto mdm = hermes::adapter::Singleton<MetadataManager>::GetInstance();
mdm->request_map.emplace(request, req);
ret = MPI_SUCCESS;
} else {
MAP_OR_FAIL(MPI_File_iwrite_at);
ret = real_MPI_File_iwrite_at_(fh, offset, buf, count, datatype, request);
}
return ret;
}

int HERMES_DECL(MPI_File_iwrite)(MPI_File fh, const void *buf, int count,
MPI_Datatype datatype, MPI_Request *request) {
return 0;
(void)fh;
(void)buf;
(void)count;
(void)datatype;
(void)request;
int ret;
if (IsTracked(&fh)) {
auto pool =
hermes::adapter::Singleton<ThreadPool>::GetInstance(kNumThreads);
HermesRequest *req = new HermesRequest();
auto func =
std::bind(MPI_File_write, fh, buf, count, datatype, &req->status);
req->return_future = pool->run(func);
auto mdm = hermes::adapter::Singleton<MetadataManager>::GetInstance();
mdm->request_map.emplace(request, req);
ret = MPI_SUCCESS;
} else {
MAP_OR_FAIL(MPI_File_iwrite);
ret = real_MPI_File_iwrite_(fh, buf, count, datatype, request);
}
return ret;
}
int HERMES_DECL(MPI_File_iwrite_shared)(MPI_File fh, const void *buf, int count,
MPI_Datatype datatype,
MPI_Request *request) {
(void)fh;
(void)buf;
(void)count;
(void)datatype;
(void)request;
return 0;
int ret;
if (IsTracked(&fh)) {
auto pool =
hermes::adapter::Singleton<ThreadPool>::GetInstance(kNumThreads);
HermesRequest *req = new HermesRequest();
auto func = std::bind(MPI_File_write_shared, fh, buf, count, datatype,
&req->status);
req->return_future = pool->run(func);
auto mdm = hermes::adapter::Singleton<MetadataManager>::GetInstance();
mdm->request_map.emplace(request, req);
ret = MPI_SUCCESS;
} else {
MAP_OR_FAIL(MPI_File_iwrite_shared);
ret = real_MPI_File_iwrite_shared_(fh, buf, count, datatype, request);
}
return ret;
}

/**
Expand All @@ -979,25 +1073,48 @@ int HERMES_DECL(MPI_File_sync)(MPI_File fh) {
if (existing.second) {
LOG(INFO) << "Intercept MPI_File_sync." << std::endl;
auto filename = existing.first.st_bkid->GetName();
auto persist = INTERCEPTOR_LIST->Persists(filename);
mdm->Delete(&fh);
hapi::Context ctx;
const auto &blob_names = existing.first.st_blobs;
if (!blob_names.empty() && INTERCEPTOR_LIST->Persists(filename)) {
INTERCEPTOR_LIST->hermes_flush_exclusion.insert(filename);
LOG(INFO) << "File handler is opened by adapter." << std::endl;
hapi::Context ctx;
if (!blob_names.empty() && persist) {
LOG(INFO) << "Adapter flushes " << blob_names.size()
<< " blobs to filename:" << filename << "." << std::endl;
hermes::api::VBucket file_vbucket(filename, mdm->GetHermes(), true,
ctx);
INTERCEPTOR_LIST->hermes_flush_exclusion.insert(filename);
hermes::api::VBucket file_vbucket(filename, mdm->GetHermes(), true);
auto offset_map = std::unordered_map<std::string, hermes::u64>();
for (const auto &blob_name : blob_names) {
file_vbucket.Link(blob_name, filename, ctx);
auto page_index = std::stol(blob_name) - 1;
offset_map.emplace(blob_name, page_index * kPageSize);

for (auto blob_name : blob_names) {
auto h_struct = mdm->DecodeBlobNameLocal(blob_name);
auto status = file_vbucket.Link(blob_name, filename, ctx);
if (!status.Failed()) {
if (h_struct.first == -1) {
auto page_index = std::stol(blob_name) - 1;
offset_map.emplace(blob_name, page_index * kPageSize);
} else {
auto page_index = std::stol(h_struct.second.blob_name_) - 1;
offset_map.emplace(
blob_name, page_index * kPageSize + h_struct.second.offset_);
}
}
}
auto trait = hermes::api::FileMappingTrait(filename, offset_map,
nullptr, NULL, NULL);
file_vbucket.Attach(&trait, ctx);
file_vbucket.Destroy(ctx);
for (const auto &vbucket : existing.first.st_vbuckets) {
hermes::api::VBucket blob_vbucket(vbucket, mdm->GetHermes(), false,
ctx);
auto blob_names_v = blob_vbucket.GetLinks(ctx);
for (auto &blob_name : blob_names_v) {
blob_vbucket.Unlink(blob_name, existing.first.st_bkid->GetName());
}
auto blob_names_temp = blob_vbucket.GetLinks(ctx);
blob_vbucket.Destroy(ctx);
}
for (auto &blob_name : existing.first.st_blobs) {
existing.first.st_bkid->DeleteBlob(blob_name);
}
existing.first.st_blobs.clear();
INTERCEPTOR_LIST->hermes_flush_exclusion.erase(filename);
}
Expand Down
Loading

0 comments on commit 9f92117

Please sign in to comment.