Skip to content

Commit

Permalink
Merge pull request #468 from lukemartinlogan/master
Browse files Browse the repository at this point in the history
Final stage-in release commits
  • Loading branch information
lukemartinlogan committed Nov 28, 2022
2 parents 3124cca + b35c906 commit cd7f515
Show file tree
Hide file tree
Showing 37 changed files with 819 additions and 566 deletions.
1 change: 0 additions & 1 deletion CMake/Testing/Temporary/CTestCostData.txt

This file was deleted.

3 changes: 0 additions & 3 deletions CMake/Testing/Temporary/LastTest.log

This file was deleted.

1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ if(CMAKE_PROJECT_NAME STREQUAL HERMES AND BUILD_TESTING)
endif()
enable_testing()
add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/test)
add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/data_stager/test)
endif()

if(HERMES_ENABLE_STDIO_ADAPTER OR HERMES_ENABLE_POSIX_ADAPTER OR
Expand Down
3 changes: 0 additions & 3 deletions adapter/Testing/Temporary/LastTest.log

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ def _CreateH(self, namespace, includes, path):
# Create the class definition
self.h_lines.append(f"namespace hermes::adapter::{namespace} {{")
self.h_lines.append(f"")
self.h_lines.append(f"/** Pointers to the real {namespace} API */")
self.h_lines.append(f"class API {{")

# Create class function pointers
Expand All @@ -175,6 +176,8 @@ def _CreateH(self, namespace, includes, path):
self.h_lines.append(f"}};")
self.h_lines.append(f"}} // namespace hermes::adapter::{namespace}")
self.h_lines.append("")
self.h_lines.append("#undef REQUIRE_API")
self.h_lines.append("")
self.h_lines.append(f"#endif // HERMES_ADAPTER_{namespace.upper()}_H")
self.h_lines.append("")

Expand All @@ -183,23 +186,26 @@ def _CreateH(self, namespace, includes, path):

def require_api(self):
self.h_lines.append(f"#define REQUIRE_API(api_name) \\")
self.h_lines.append(f" if (real_api->api_name == nullptr) {{ \\")
self.h_lines.append(f" if (api_name == nullptr) {{ \\")
self.h_lines.append(f" LOG(FATAL) << \"HERMES Adapter failed to map symbol: \" \\")
self.h_lines.append(f" #api_name << std::endl; \\")
self.h_lines.append(f" exit(1);")
self.h_lines.append(f" std::exit(1); \\")
self.h_lines.append(f" }}")

def add_typedef(self, api):
self.h_lines.append(f"typedef {api.ret} (*{api.type})({api.get_args()});")

def add_intercept_api(self, api):
self.h_lines.append(f" {api.ret} (*{api.real_name})({api.get_args()}) = nullptr;")
self.h_lines.append(f" /** {api.real_name} */")
self.h_lines.append(f" {api.type} {api.real_name} = nullptr;")

def init_api(self, api):
self.h_lines.append(f" if (is_intercepted) {{")
self.h_lines.append(f" {api.real_name} = ({api.type})dlsym(RTLD_NEXT, \"{api.name}\");")
self.h_lines.append(f" }} else {{")
self.h_lines.append(f" {api.real_name} = ({api.type})dlsym(RTLD_DEFAULT, \"{api.name}\");")
self.h_lines.append(f" }}")
self.h_lines.append(f" REQUIRE_API({api.real_name})")

def save(self, path, text):
if path is None:
Expand Down
4 changes: 3 additions & 1 deletion adapter/adapter_generator/posix.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
Api("ssize_t pwrite64(int fd, const void *buf, size_t count, off64_t offset)"),
Api("off_t lseek(int fd, off_t offset, int whence)"),
Api("off64_t lseek64(int fd, off64_t offset, int whence)"),
Api("int __fxstat(int version, int fd, struct stat *buf)"),
Api("int __fxstat(int __ver, int __filedesc, struct stat *__stat_buf)"),
Api("int fsync(int fd)"),
Api("int close(int fd)"),
]

includes = [
"<sys/types.h>",
"<sys/stat.h>",
"<unistd.h>",
"<fcntl.h>",
"\"interceptor.h\"",
Expand Down
1 change: 1 addition & 0 deletions adapter/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ const char kPathDelimiter = ',';
const char* kAdapterDefaultMode = "DEFAULT";
const char* kAdapterBypassMode = "BYPASS";
const char* kAdapterScratchMode = "SCRATCH";
const char* kAdapterWorkflowMode = "WORKFLOW";

/**
* If the \c HERMES_STOP_DAEMON environment variable is unset or has a non-zero
Expand Down
5 changes: 3 additions & 2 deletions adapter/enumerations.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
#define HERMES_ADAPTER_ENUMERATIONS_H
enum class AdapterMode {
kDefault = 0, /**< All/given files are stored on file close or flush. */
kBypass = 1, /**< All/given files are not buffered. */
kScratch = 2 /**< All/given files are ignored on file close or flush. */
kBypass = 1, /**< All/given files are not buffered. */
kScratch = 2, /**< All/given files are ignored on file close or flush. */
kWorkflow = 3 /**< Keep data in hermes until user stages out. */
};

enum class FlushingMode {
Expand Down
40 changes: 32 additions & 8 deletions adapter/filesystem/filesystem.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,18 @@ void Filesystem::Open(AdapterStat &stat, File &f, const std::string &path) {
u8 c;
stat.st_bkid =
std::make_shared<hapi::Bucket>(path_str, mdm->GetHermes());
stat.st_bkid->Put(stat.main_lock_blob, &c, 1);
if (!stat.st_bkid->ContainsBlob(stat.main_lock_blob)) {
stat.st_bkid->Put(stat.main_lock_blob, &c, 1);
}
}
MPI_Barrier(stat.comm);
if (rank != 0) {
stat.st_bkid =
std::make_shared<hapi::Bucket>(path_str, mdm->GetHermes());
}

// Create a vbucket for storing all modified blobs (per-rank)

} else {
stat.st_bkid =
std::make_shared<hapi::Bucket>(path_str, mdm->GetHermes());
Expand Down Expand Up @@ -138,7 +143,7 @@ size_t Filesystem::Write(File &f, AdapterStat &stat, const void *ptr,
std::shared_ptr<hapi::Bucket> &bkt = stat.st_bkid;
std::string filename = bkt->GetName();
LOG(INFO) << "Write called for filename: " << filename << " on offset: "
<< stat.st_ptr << " and size: " << total_size << std::endl;
<< off << " and size: " << total_size << std::endl;

size_t ret;
auto mdm = Singleton<MetadataManager>::GetInstance();
Expand Down Expand Up @@ -484,9 +489,7 @@ size_t Filesystem::_ReadNew(BlobPlacementIter &ri) {

if (ri.opts_.dpe_ != PlacementPolicy::kNone) {
LOG(INFO) << "Placing the read blob in the hierarchy" << std::endl;
IoOptions opts(ri.opts_);
opts.seek_ = false;
opts.with_fallback_ = false;
IoOptions opts = IoOptions::PlaceInHermes(ri.opts_);
Write(ri.f_, ri.stat_,
ri.blob_.data() + ri.p_.blob_off_,
ri.p_.bucket_off_,
Expand Down Expand Up @@ -623,6 +626,7 @@ int Filesystem::Sync(File &f, AdapterStat &stat) {
return 0;
}
if (IsAsyncFlush(filename)) {
LOG(INFO) << "Asynchronous flushing enabled" << std::endl;
stat.st_vbkt->WaitForBackgroundFlush();
return 0;
}
Expand Down Expand Up @@ -654,6 +658,23 @@ int Filesystem::Sync(File &f, AdapterStat &stat) {
return _RealSync(f);
}

void SaveModifiedBlobSet(AdapterStat &stat) {
int rank = 0;
auto mdm = Singleton<MetadataManager>::GetInstance();
if (mdm->is_mpi) {
MPI_Comm_rank(stat.comm, &rank);
}
auto filename = stat.st_bkid->GetName();
const auto &blob_names = stat.st_blobs;
std::string vbucket_name = filename + "#" +
std::to_string(rank) + "#sync";
hapi::VBucket file_vbucket(vbucket_name, mdm->GetHermes());
auto offset_map = std::unordered_map<std::string, hermes::u64>();
for (const auto &blob_name : blob_names) {
file_vbucket.Link(blob_name, filename);
}
}

int Filesystem::Close(File &f, AdapterStat &stat, bool destroy) {
hapi::Context ctx;
int rank = 0;
Expand All @@ -678,19 +699,22 @@ int Filesystem::Close(File &f, AdapterStat &stat, bool destroy) {
}
MPI_Barrier(stat.comm);
}
if (INTERCEPTOR_LIST->adapter_mode == AdapterMode::kScratch) {
if (INTERCEPTOR_LIST->adapter_mode == AdapterMode::kScratch ||
INTERCEPTOR_LIST->adapter_mode == AdapterMode::kWorkflow) {
destroy = false;
}

Sync(f, stat);
Sync(f, stat); // TODO(llogan): should wait until hermes destroyed to flush
auto filename = stat.st_bkid->GetName();
if (mdm->is_mpi) { MPI_Barrier(stat.comm); }
if (IsAsyncFlush(filename)) {
stat.st_vbkt->Destroy();
}
mdm->Delete(f);
if (mdm->is_mpi) { MPI_Barrier(stat.comm); }
if (destroy) { stat.st_bkid->Destroy(ctx); }
if (destroy) {
stat.st_bkid->Destroy(ctx);
}
if (stat.amode & MPI_MODE_DELETE_ON_CLOSE) {
stdfs::remove(filename);
}
Expand Down
18 changes: 16 additions & 2 deletions adapter/filesystem/filesystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,18 @@ struct IoOptions {
opts.seek_ = seek;
return opts;
}

/**
* Ensure that I/O goes only to Hermes, and does not fall back to PFS.
*
* @param orig_opts The original options to modify
* */
static IoOptions PlaceInHermes(IoOptions &orig_opts) {
IoOptions opts(orig_opts);
opts.seek_ = false;
opts.with_fallback_ = false;
return opts;
}
};

/**
Expand Down Expand Up @@ -337,10 +349,12 @@ class Filesystem {
* well-defined.
* */
if (bucket_exists) {
size_t orig = stat.st_size;
size_t bkt_size = stat.st_bkid->GetTotalBlobSize();
stat.st_size = std::max(bkt_size, stat.st_size);
stat.st_size = std::max(bkt_size, orig);
LOG(INFO) << "Since bucket exists, should reset its size to: "
<< stat.st_size << std::endl;
<< bkt_size << " or " << orig
<< ", winner: " << stat.st_size << std::endl;
}
if (stat.is_append) {
stat.st_ptr = stat.st_size;
Expand Down
5 changes: 4 additions & 1 deletion adapter/interceptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ struct InterceptorList {
adapter_mode = AdapterMode::kBypass;
} else if (strcmp(kAdapterScratchMode, adapter_mode_str) == 0) {
adapter_mode = AdapterMode::kScratch;
} else if (strcmp(kAdapterWorkflowMode, adapter_mode_str) == 0) {
adapter_mode = AdapterMode::kWorkflow;
} else {
// TODO(hari): @errorhandling throw error.
return;
Expand Down Expand Up @@ -157,7 +159,8 @@ struct InterceptorList {
check if \a path file path persists
*/
bool Persists(std::string path) {
if (adapter_mode == AdapterMode::kDefault) {
if (adapter_mode == AdapterMode::kDefault ||
adapter_mode == AdapterMode::kWorkflow) {
if (adapter_paths.empty()) {
return true;
} else {
Expand Down
9 changes: 5 additions & 4 deletions adapter/mapper/abstract_mapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,15 @@ struct BlobPlacement {
}

/** decode \a blob_name BLOB name by splitting it into index, offset, size,
and rank. */
void DecodeBlobNamePerProc(const std::string &blob_name) {
auto str_split = hermes::adapter::StringSplit(blob_name.data(), '#');
and rank. */
void DecodeBlobNameLogEntry(const std::string &blob_name) {
auto str_split =
hermes::adapter::StringSplit(blob_name.data(), '#');
std::stringstream(str_split[0]) >> page_;
std::stringstream(str_split[1]) >> blob_off_;
std::stringstream(str_split[2]) >> blob_size_;
std::stringstream(str_split[3]) >> rank_;
std::stringstream(str_split[4]) >> rank_;
std::stringstream(str_split[4]) >> time_;
}
};

Expand Down
Loading

0 comments on commit cd7f515

Please sign in to comment.