Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Final stage-in release commits #468

Merged
merged 20 commits into from
Nov 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion CMake/Testing/Temporary/CTestCostData.txt

This file was deleted.

3 changes: 0 additions & 3 deletions CMake/Testing/Temporary/LastTest.log

This file was deleted.

1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ if(CMAKE_PROJECT_NAME STREQUAL HERMES AND BUILD_TESTING)
endif()
enable_testing()
add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/test)
add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/data_stager/test)
endif()

if(HERMES_ENABLE_STDIO_ADAPTER OR HERMES_ENABLE_POSIX_ADAPTER OR
Expand Down
3 changes: 0 additions & 3 deletions adapter/Testing/Temporary/LastTest.log

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ def _CreateH(self, namespace, includes, path):
# Create the class definition
self.h_lines.append(f"namespace hermes::adapter::{namespace} {{")
self.h_lines.append(f"")
self.h_lines.append(f"/** Pointers to the real {namespace} API */")
self.h_lines.append(f"class API {{")

# Create class function pointers
Expand All @@ -175,6 +176,8 @@ def _CreateH(self, namespace, includes, path):
self.h_lines.append(f"}};")
self.h_lines.append(f"}} // namespace hermes::adapter::{namespace}")
self.h_lines.append("")
self.h_lines.append("#undef REQUIRE_API")
self.h_lines.append("")
self.h_lines.append(f"#endif // HERMES_ADAPTER_{namespace.upper()}_H")
self.h_lines.append("")

Expand All @@ -183,23 +186,26 @@ def _CreateH(self, namespace, includes, path):

def require_api(self):
self.h_lines.append(f"#define REQUIRE_API(api_name) \\")
self.h_lines.append(f" if (real_api->api_name == nullptr) {{ \\")
self.h_lines.append(f" if (api_name == nullptr) {{ \\")
self.h_lines.append(f" LOG(FATAL) << \"HERMES Adapter failed to map symbol: \" \\")
self.h_lines.append(f" #api_name << std::endl; \\")
self.h_lines.append(f" exit(1);")
self.h_lines.append(f" std::exit(1); \\")
self.h_lines.append(f" }}")

def add_typedef(self, api):
self.h_lines.append(f"typedef {api.ret} (*{api.type})({api.get_args()});")

def add_intercept_api(self, api):
self.h_lines.append(f" {api.ret} (*{api.real_name})({api.get_args()}) = nullptr;")
self.h_lines.append(f" /** {api.real_name} */")
self.h_lines.append(f" {api.type} {api.real_name} = nullptr;")

def init_api(self, api):
self.h_lines.append(f" if (is_intercepted) {{")
self.h_lines.append(f" {api.real_name} = ({api.type})dlsym(RTLD_NEXT, \"{api.name}\");")
self.h_lines.append(f" }} else {{")
self.h_lines.append(f" {api.real_name} = ({api.type})dlsym(RTLD_DEFAULT, \"{api.name}\");")
self.h_lines.append(f" }}")
self.h_lines.append(f" REQUIRE_API({api.real_name})")

def save(self, path, text):
if path is None:
Expand Down
4 changes: 3 additions & 1 deletion adapter/adapter_generator/posix.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
Api("ssize_t pwrite64(int fd, const void *buf, size_t count, off64_t offset)"),
Api("off_t lseek(int fd, off_t offset, int whence)"),
Api("off64_t lseek64(int fd, off64_t offset, int whence)"),
Api("int __fxstat(int version, int fd, struct stat *buf)"),
Api("int __fxstat(int __ver, int __filedesc, struct stat *__stat_buf)"),
Api("int fsync(int fd)"),
Api("int close(int fd)"),
]

includes = [
"<sys/types.h>",
"<sys/stat.h>",
"<unistd.h>",
"<fcntl.h>",
"\"interceptor.h\"",
Expand Down
1 change: 1 addition & 0 deletions adapter/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ const char kPathDelimiter = ',';
const char* kAdapterDefaultMode = "DEFAULT";
const char* kAdapterBypassMode = "BYPASS";
const char* kAdapterScratchMode = "SCRATCH";
const char* kAdapterWorkflowMode = "WORKFLOW";

/**
* If the \c HERMES_STOP_DAEMON environment variable is unset or has a non-zero
Expand Down
5 changes: 3 additions & 2 deletions adapter/enumerations.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
#define HERMES_ADAPTER_ENUMERATIONS_H
enum class AdapterMode {
kDefault = 0, /**< All/given files are stored on file close or flush. */
kBypass = 1, /**< All/given files are not buffered. */
kScratch = 2 /**< All/given files are ignored on file close or flush. */
kBypass = 1, /**< All/given files are not buffered. */
kScratch = 2, /**< All/given files are ignored on file close or flush. */
kWorkflow = 3 /**< Keep data in hermes until user stages out. */
};

enum class FlushingMode {
Expand Down
40 changes: 32 additions & 8 deletions adapter/filesystem/filesystem.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,18 @@ void Filesystem::Open(AdapterStat &stat, File &f, const std::string &path) {
u8 c;
stat.st_bkid =
std::make_shared<hapi::Bucket>(path_str, mdm->GetHermes());
stat.st_bkid->Put(stat.main_lock_blob, &c, 1);
if (!stat.st_bkid->ContainsBlob(stat.main_lock_blob)) {
stat.st_bkid->Put(stat.main_lock_blob, &c, 1);
}
}
MPI_Barrier(stat.comm);
if (rank != 0) {
stat.st_bkid =
std::make_shared<hapi::Bucket>(path_str, mdm->GetHermes());
}

// Create a vbucket for storing all modified blobs (per-rank)

} else {
stat.st_bkid =
std::make_shared<hapi::Bucket>(path_str, mdm->GetHermes());
Expand Down Expand Up @@ -138,7 +143,7 @@ size_t Filesystem::Write(File &f, AdapterStat &stat, const void *ptr,
std::shared_ptr<hapi::Bucket> &bkt = stat.st_bkid;
std::string filename = bkt->GetName();
LOG(INFO) << "Write called for filename: " << filename << " on offset: "
<< stat.st_ptr << " and size: " << total_size << std::endl;
<< off << " and size: " << total_size << std::endl;

size_t ret;
auto mdm = Singleton<MetadataManager>::GetInstance();
Expand Down Expand Up @@ -484,9 +489,7 @@ size_t Filesystem::_ReadNew(BlobPlacementIter &ri) {

if (ri.opts_.dpe_ != PlacementPolicy::kNone) {
LOG(INFO) << "Placing the read blob in the hierarchy" << std::endl;
IoOptions opts(ri.opts_);
opts.seek_ = false;
opts.with_fallback_ = false;
IoOptions opts = IoOptions::PlaceInHermes(ri.opts_);
Write(ri.f_, ri.stat_,
ri.blob_.data() + ri.p_.blob_off_,
ri.p_.bucket_off_,
Expand Down Expand Up @@ -623,6 +626,7 @@ int Filesystem::Sync(File &f, AdapterStat &stat) {
return 0;
}
if (IsAsyncFlush(filename)) {
LOG(INFO) << "Asynchronous flushing enabled" << std::endl;
stat.st_vbkt->WaitForBackgroundFlush();
return 0;
}
Expand Down Expand Up @@ -654,6 +658,23 @@ int Filesystem::Sync(File &f, AdapterStat &stat) {
return _RealSync(f);
}

void SaveModifiedBlobSet(AdapterStat &stat) {
int rank = 0;
auto mdm = Singleton<MetadataManager>::GetInstance();
if (mdm->is_mpi) {
MPI_Comm_rank(stat.comm, &rank);
}
auto filename = stat.st_bkid->GetName();
const auto &blob_names = stat.st_blobs;
std::string vbucket_name = filename + "#" +
std::to_string(rank) + "#sync";
hapi::VBucket file_vbucket(vbucket_name, mdm->GetHermes());
auto offset_map = std::unordered_map<std::string, hermes::u64>();
for (const auto &blob_name : blob_names) {
file_vbucket.Link(blob_name, filename);
}
}

int Filesystem::Close(File &f, AdapterStat &stat, bool destroy) {
hapi::Context ctx;
int rank = 0;
Expand All @@ -678,19 +699,22 @@ int Filesystem::Close(File &f, AdapterStat &stat, bool destroy) {
}
MPI_Barrier(stat.comm);
}
if (INTERCEPTOR_LIST->adapter_mode == AdapterMode::kScratch) {
if (INTERCEPTOR_LIST->adapter_mode == AdapterMode::kScratch ||
INTERCEPTOR_LIST->adapter_mode == AdapterMode::kWorkflow) {
destroy = false;
}

Sync(f, stat);
Sync(f, stat); // TODO(llogan): should wait until hermes destroyed to flush
auto filename = stat.st_bkid->GetName();
if (mdm->is_mpi) { MPI_Barrier(stat.comm); }
if (IsAsyncFlush(filename)) {
stat.st_vbkt->Destroy();
}
mdm->Delete(f);
if (mdm->is_mpi) { MPI_Barrier(stat.comm); }
if (destroy) { stat.st_bkid->Destroy(ctx); }
if (destroy) {
stat.st_bkid->Destroy(ctx);
}
if (stat.amode & MPI_MODE_DELETE_ON_CLOSE) {
stdfs::remove(filename);
}
Expand Down
18 changes: 16 additions & 2 deletions adapter/filesystem/filesystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,18 @@ struct IoOptions {
opts.seek_ = seek;
return opts;
}

/**
* Ensure that I/O goes only to Hermes, and does not fall back to PFS.
*
* @param orig_opts The original options to modify
* */
static IoOptions PlaceInHermes(IoOptions &orig_opts) {
IoOptions opts(orig_opts);
opts.seek_ = false;
opts.with_fallback_ = false;
return opts;
}
};

/**
Expand Down Expand Up @@ -337,10 +349,12 @@ class Filesystem {
* well-defined.
* */
if (bucket_exists) {
size_t orig = stat.st_size;
size_t bkt_size = stat.st_bkid->GetTotalBlobSize();
stat.st_size = std::max(bkt_size, stat.st_size);
stat.st_size = std::max(bkt_size, orig);
LOG(INFO) << "Since bucket exists, should reset its size to: "
<< stat.st_size << std::endl;
<< bkt_size << " or " << orig
<< ", winner: " << stat.st_size << std::endl;
}
if (stat.is_append) {
stat.st_ptr = stat.st_size;
Expand Down
5 changes: 4 additions & 1 deletion adapter/interceptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ struct InterceptorList {
adapter_mode = AdapterMode::kBypass;
} else if (strcmp(kAdapterScratchMode, adapter_mode_str) == 0) {
adapter_mode = AdapterMode::kScratch;
} else if (strcmp(kAdapterWorkflowMode, adapter_mode_str) == 0) {
adapter_mode = AdapterMode::kWorkflow;
} else {
// TODO(hari): @errorhandling throw error.
return;
Expand Down Expand Up @@ -157,7 +159,8 @@ struct InterceptorList {
check if \a path file path persists
*/
bool Persists(std::string path) {
if (adapter_mode == AdapterMode::kDefault) {
if (adapter_mode == AdapterMode::kDefault ||
adapter_mode == AdapterMode::kWorkflow) {
if (adapter_paths.empty()) {
return true;
} else {
Expand Down
9 changes: 5 additions & 4 deletions adapter/mapper/abstract_mapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,15 @@ struct BlobPlacement {
}

/** decode \a blob_name BLOB name by splitting it into index, offset, size,
and rank. */
void DecodeBlobNamePerProc(const std::string &blob_name) {
auto str_split = hermes::adapter::StringSplit(blob_name.data(), '#');
and rank. */
void DecodeBlobNameLogEntry(const std::string &blob_name) {
auto str_split =
hermes::adapter::StringSplit(blob_name.data(), '#');
std::stringstream(str_split[0]) >> page_;
std::stringstream(str_split[1]) >> blob_off_;
std::stringstream(str_split[2]) >> blob_size_;
std::stringstream(str_split[3]) >> rank_;
std::stringstream(str_split[4]) >> rank_;
std::stringstream(str_split[4]) >> time_;
}
};

Expand Down
Loading