diff --git a/adapter/src/hermes/adapter/stdio/common/constants.h b/adapter/src/hermes/adapter/stdio/common/constants.h index c03d5bf42..ceba40aa5 100644 --- a/adapter/src/hermes/adapter/stdio/common/constants.h +++ b/adapter/src/hermes/adapter/stdio/common/constants.h @@ -16,10 +16,12 @@ /** * Standard header */ +#include /** * Dependent library header */ +#include "glog/logging.h" /** * Internal header @@ -35,10 +37,29 @@ using hermes::adapter::stdio::MapperType; * Which mapper to be used by STDIO adapter. */ const MapperType kMapperType = MapperType::BALANCED; + /** * Define kPageSize for balanced mapping. */ -const size_t kPageSize = 1024 * 1024; +const size_t kPageSize = []() { + const char *kPageSizeVar = "HERMES_PAGE_SIZE"; + const size_t kDefaultPageSize = 1 * 1024 * 1024; + + size_t result = kDefaultPageSize; + char *page_size = getenv(kPageSizeVar); + + if (page_size) { + result = (size_t)std::strtoull(page_size, NULL, 0); + if (result == 0) { + LOG(FATAL) << "Invalid value of " << kPageSizeVar << ": " << page_size; + } + } + + LOG(INFO) << "Stdio adapter page size: " << result << "\n"; + + return result; +}(); + /** * String delimiter */ diff --git a/adapter/src/hermes/adapter/stdio/stdio.cc b/adapter/src/hermes/adapter/stdio/stdio.cc index ecf58a0ba..74d3153cd 100644 --- a/adapter/src/hermes/adapter/stdio/stdio.cc +++ b/adapter/src/hermes/adapter/stdio/stdio.cc @@ -248,7 +248,16 @@ void ReadGap(const std::string &filename, size_t seek_offset, u8 *read_ptr, void PutWithStdioFallback(AdapterStat &stat, const std::string &blob_name, const std::string &filename, u8 *data, size_t size, size_t offset) { - hapi::Status status = stat.st_bkid->Put(blob_name, data, size); + hapi::Context ctx; + const char *hermes_write_only = getenv("HERMES_WRITE_ONLY"); + + if (hermes_write_only && hermes_write_only[0] == '1') { + // Custom DPE for write-only apps like VPIC + ctx.rr_retry = true; + ctx.disable_swap = true; + } + + hapi::Status status = stat.st_bkid->Put(blob_name, data, size, ctx); if (status.Failed()) { LOG(WARNING) << "Failed to Put Blob " << blob_name << " to Bucket " << filename << ". Falling back to stdio." << std::endl; diff --git a/benchmarks/CMakeLists.txt b/benchmarks/CMakeLists.txt index 476c34133..5254ed6b0 100644 --- a/benchmarks/CMakeLists.txt +++ b/benchmarks/CMakeLists.txt @@ -3,7 +3,7 @@ include_directories( ${PROJECT_SOURCE_DIR}/test ) -set(BENCHMARKS mdm_bench dpe_bench) +set(BENCHMARKS mdm_bench dpe_bench vpic_bench) foreach(benchmark ${BENCHMARKS}) add_executable(${benchmark} ${benchmark}.cc) diff --git a/benchmarks/dpe_bench.cc b/benchmarks/dpe_bench.cc index 7adb789d7..f5ed581e0 100644 --- a/benchmarks/dpe_bench.cc +++ b/benchmarks/dpe_bench.cc @@ -140,7 +140,7 @@ int main(int argc, char **argv) { case api::PlacementPolicy::kRoundRobin: { time_point start_tm = now(); result = RoundRobinPlacement(blob_sizes, tgt_state.bytes_available, - output_tmp, targets); + output_tmp, targets, false); std::cout << "DPE benchmark uses RoundRobin placement.\n\n"; time_point end_tm = now(); dpe_seconds = std::chrono::duration(end_tm - start_tm).count(); diff --git a/benchmarks/vpic_bench.cc b/benchmarks/vpic_bench.cc new file mode 100644 index 000000000..2549bb625 --- /dev/null +++ b/benchmarks/vpic_bench.cc @@ -0,0 +1,557 @@ +/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * + * Distributed under BSD 3-Clause license. * + * Copyright by The HDF Group. * + * Copyright by the Illinois Institute of Technology. * + * All rights reserved. * + * * + * This file is part of Hermes. The full Hermes copyright notice, including * + * terms governing use, modification, and redistribution, is contained in * + * the COPYING file, which can be found at the top directory. If you do not * + * have access to the file, you may request a copy from help@hdfgroup.org. * + * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "mpi.h" +#include "hermes.h" +#include "bucket.h" +#include "test_utils.h" + +namespace hapi = hermes::api; +using u8 = hermes::u8; +using std::chrono::duration; +const auto now = std::chrono::high_resolution_clock::now; + +const int kXdim = 64; + +const int kDefaultSleepSeconds = 0; +const size_t kDefaultDataSizeMB = 8; +const size_t kDefaultIoSizeMB = kDefaultDataSizeMB; +const char *kDefaultOutputPath = "./"; +const int kDefaultIterations = 1; +const char *kDefaultDpePolicy = "none"; +const bool kDefaultSharedBucket = true; +const bool kDefaultDoPosixIo = true; +const bool kDefaultDirectIo = false; +const bool kDefaultSync = false; +const bool kDefaultVerifyResults = false; + +struct Options { + size_t data_size_mb; + size_t io_size_mb; + int num_iterations; + int sleep_seconds; + std::string output_path; + std::string dpe_policy; + bool shared_bucket; + bool do_posix_io; + bool direct_io; + bool sync; + bool verify_results; +}; + +struct Timing { + double fopen_time; + double fwrite_time; + double fclose_time; + double compute_time; + double total_time; +}; + +void GetDefaultOptions(Options *options) { + options->sleep_seconds = kDefaultSleepSeconds; + options->data_size_mb = kDefaultDataSizeMB; + options->io_size_mb = kDefaultIoSizeMB; + options->output_path = kDefaultOutputPath; + options->num_iterations = kDefaultIterations; + options->dpe_policy = kDefaultDpePolicy; + options->shared_bucket = kDefaultSharedBucket; + options->do_posix_io = kDefaultDoPosixIo; + options->direct_io = kDefaultDirectIo; + options->sync = kDefaultSync; + options->verify_results = kDefaultVerifyResults; +} + +#define HERMES_BOOL_OPTION(var) (var) ? "true" : "false" + +void PrintUsage(char *program) { + fprintf(stderr, "Usage: %s [-bhiopx] \n", program); + fprintf(stderr, " -a (default %d)\n", kDefaultSleepSeconds); + fprintf(stderr, " The number of seconds to sleep between each loop\n" + " iteration to simulate computation.\n"); + fprintf(stderr, " -c \n"); + fprintf(stderr, " Path to a Hermes configuration file.\n"); + fprintf(stderr, " -d (default %s)\n", HERMES_BOOL_OPTION(kDefaultDirectIo)); + fprintf(stderr, " Boolean flag. Do POSIX I/O with O_DIRECT.\n"); + fprintf(stderr, " -f (default %s)\n", HERMES_BOOL_OPTION(kDefaultSync)); + fprintf(stderr, " Boolean flag. fflush and fsync after every write\n"); + fprintf(stderr, " -h\n"); + fprintf(stderr, " Print help\n"); + fprintf(stderr, " -i (default %d)\n", kDefaultIterations); + fprintf(stderr, " The number of times to run the VPIC I/O kernel.\n"); + fprintf(stderr, " The number of nodes (only required when -x is used)\n"); + fprintf(stderr, " -o (default %s)\n", kDefaultOutputPath); + fprintf(stderr, " The path to an output file, which will be called\n" + " 'vpic_.out\n"); + fprintf(stderr, " -p (default %zu)\n", kDefaultDataSizeMB); + fprintf(stderr, " The size of particle data in MiB for each variable.\n"); + fprintf(stderr, " -s (default %s)\n", + HERMES_BOOL_OPTION(kDefaultSharedBucket)); + fprintf(stderr, " Boolean flag. Whether to share a single Bucket or \n" + " give each rank its own Bucket\n"); + fprintf(stderr, " -t (default is the value of -p)\n"); + fprintf(stderr, " The size of each I/O. Must be a multiple of the total\n" + " data_size_mb (-p). I/O will be done in a loop with\n" + " io_size_mb/data_size_mb iterations\n"); + fprintf(stderr, " -v (default %s)\n", + HERMES_BOOL_OPTION(kDefaultVerifyResults)); + fprintf(stderr, " Boolean flag. If enabled, read the written results\n" + " and verify that they match what's expected.\n"); + fprintf(stderr, " -x (default %s)\n", HERMES_BOOL_OPTION(kDefaultDoPosixIo)); + fprintf(stderr, " Boolean flag. If enabled, POSIX I/O is performed\n" + " instead of going through Hermes.\n"); +} + +Options HandleArgs(int argc, char **argv) { + Options result = {}; + GetDefaultOptions(&result); + + bool io_size_provided = false; + int option = -1; + + while ((option = getopt(argc, argv, "a:dfhi:o:p:st:vx")) != -1) { + switch (option) { + case 'a': { + result.sleep_seconds = atoi(optarg); + break; + } + case 'd': { + result.direct_io = true; + break; + } + case 'f': { + result.sync = true; + break; + } + case 'h': { + PrintUsage(argv[0]); + exit(0); + } + case 'i': { + result.num_iterations = atoi(optarg); + break; + } + case 'o': { + result.output_path = optarg; + break; + } + case 'p': { + result.data_size_mb = (size_t)std::stoull(optarg); + break; + } + case 's': { + result.shared_bucket = false; + break; + } + case 't': { + result.io_size_mb = (size_t)std::stoull(optarg); + io_size_provided = true; + break; + } + case 'v': { + result.verify_results = true; + break; + } + case 'x': { + result.do_posix_io = false; + break; + } + default: { + PrintUsage(argv[0]); + exit(1); + } + } + } + + if (optind < argc) { + fprintf(stderr, "non-option ARGV-elements: "); + while (optind < argc) { + fprintf(stderr, "%s ", argv[optind++]); + } + fprintf(stderr, "\n"); + } + + if (!io_size_provided) { + result.io_size_mb = result.data_size_mb; + } + + if (MEGABYTES(result.data_size_mb) % MEGABYTES(result.io_size_mb) != 0) { + fprintf(stderr, + "io_size_mb (-t) must be a multiple of data_size_mb (-p)\n"); + exit(1); + } + + return result; +} + +static inline double uniform_random_number() { + return (((double)rand())/((double)(RAND_MAX))); +} + +static void DoFwrite(void *data, size_t size, FILE *f) { + size_t bytes_written = fwrite(data, 1, size, f); + CHECK_EQ(bytes_written, size); +} + +static void DoWrite(float *data, size_t size, int fd) { + ssize_t bytes_written = write(fd, data, size); + CHECK_EQ(bytes_written, size); +} + +#if 0 +static void InitParticles(const int x_dim, const int y_dim, const int z_dim, + std::vector &id1, std::vector &id2, + std::vector &x, std::vector &y, + std::vector &z, std::vector &px, + std::vector &py, std::vector &pz) { + size_t num_particles = id1.size(); + + std::iota(id1.begin(), id1.end(), 0); + + for (size_t i = 0; i < num_particles; ++i) { + id2[i] = id1[i] * 2; + } + + std::generate(x.begin(), x.end(), [&x_dim]() { + return uniform_random_number() * x_dim; + }); + + std::generate(y.begin(), y.end(), [&y_dim]() { + return uniform_random_number() * y_dim; + }); + + for (size_t i = 0; i < num_particles; ++i) { + z[i] = ((double)id1[i] / num_particles) * z_dim; + } + + std::generate(px.begin(), px.end(), [&x_dim]() { + return uniform_random_number() * x_dim; + }); + + std::generate(py.begin(), py.end(), [&y_dim]() { + return uniform_random_number() * y_dim; + }); + + for (size_t i = 0; i < num_particles; ++i) { + pz[i] = ((double)id2[i] / num_particles) * z_dim; + } +} +#endif + +double GetMPIAverage(double rank_seconds, int num_ranks, MPI_Comm comm) { + double total_secs = 0; + MPI_Reduce(&rank_seconds, &total_secs, 1, MPI_DOUBLE, MPI_SUM, 0, comm); + double result = total_secs / num_ranks; + + return result; +} + +double GetMPIAverage(double rank_seconds, int num_ranks, MPI_Comm *comm) { + double result = GetMPIAverage(rank_seconds, num_ranks, *comm); + + return result; +} + +#if 0 +double GetBandwidth(const Options &options, double total_elapsed, + MPI_Comm comm) { + double avg_total_seconds = GetMPIAverage(total_elapsed, 8, comm); + double total_mb = + options.data_size_mb * kDefaultNumVariables * options.num_iterations; + double result = total_mb / avg_total_seconds; + + return result; +} + +void RunHermesBench(Options &options, float *data) { + double bandwidth = 0; + std::shared_ptr hermes = nullptr; + + if (options.config_path.size() != 0) { + hermes = hapi::InitHermes(options.config_path.c_str()); + } else { + hermes::Config config = {}; + hermes::InitDefaultConfig(&config); + config.num_devices = 1; + config.num_targets = 1; + config.capacities[0] = 128 * 1024 * 1024; + config.system_view_state_update_interval_ms = 500; + + hermes = hermes::InitHermes(&config); + } + + if (hermes->IsApplicationCore()) { + int my_rank = hermes->GetProcessRank(); + MPI_Comm *comm = (MPI_Comm *)hermes->GetAppCommunicator(); + options.num_nodes = hermes->rpc_.num_nodes; + + std::string bucket_name = "vpic_"; + + if (options.shared_bucket) { + bucket_name += "shared"; + } else { + bucket_name += std::to_string(my_rank); + } + + std::string blob_name = "data_" + std::to_string(my_rank); + size_t total_bytes = MEGABYTES(options.data_size_mb); + + constexpr int kNumPolicies = 3; + + const std::array policies = { + hapi::PlacementPolicy::kRandom, + hapi::PlacementPolicy::kRoundRobin, + hapi::PlacementPolicy::kMinimizeIoTime + }; + + const std::array policy_strings = { + "random", + "round_robin", + "minimize_io_time" + }; + + for (int p = 0; p < kNumPolicies; ++p) { + hermes::testing::Timer timer; + hapi::Context ctx; + ctx.policy = policies[p]; + options.dpe_policy = policy_strings[p]; + + hapi::Bucket bucket(bucket_name, hermes, ctx); + + hermes->AppBarrier(); + + for (int i = 0; i < options.num_iterations; ++i) { + timer.resumeTime(); + CHECK(bucket.Put(blob_name, (u8*)data, total_bytes).Succeeded()); + timer.pauseTime(); + + // NOTE(chogan): Simulate computation and let SystemViewState update + std::this_thread::sleep_for(std::chrono::duration(500)); + + // TODO(chogan): Investigate crash when these barriers aren't here + hermes->AppBarrier(); + CHECK(bucket.DeleteBlob(blob_name).Succeeded()); + hermes->AppBarrier(); + } + + hermes->AppBarrier(); + + if (options.shared_bucket) { + if (my_rank != 0) { + bucket.Release(); + } + hermes->AppBarrier(); + if (my_rank == 0) { + bucket.Destroy(); + } + } else { + // TODO(chogan): Investigate whey refcount is sometimes > 1 + bucket.Destroy(); + } + + bandwidth = GetBandwidth(options, timer.getElapsedTime(), *comm); + + if (my_rank == 0) { + std::string buffering = "hermes"; + PrintResults(options, bandwidth, buffering); + } + hermes->AppBarrier(); + } + } else { + // Hermes core. No user code. + } + + hermes->Finalize(); +} +#endif + +void PrintResults(const Options &options, const Timing &timing, + double bandwidth) { + double compute_percentage = timing.compute_time / timing.total_time; + printf("%zu,%zu,%f,%f,%f,%f,%f,%f,%f\n", options.data_size_mb, + options.io_size_mb, timing.fopen_time, timing.fwrite_time, + timing.fclose_time, timing.total_time, timing.compute_time, + bandwidth, compute_percentage); +} + +std::string GetOutputPath(const std::string &output_path, int rank) { + std::string output_file = "vpic_posix_" + std::to_string(rank) + ".out"; + int last_char_index = output_path.size() > 0 ? output_path.size() - 1 : 0; + std::string maybe_slash = output_path[last_char_index] == '/' ? "" : "/"; + std::string result = output_path + maybe_slash + output_file; + + return result; +} + +Timing RunPosixBench(Options &options, float *x, int rank) { + Timing result = {}; + + hermes::testing::Timer fopen_timer; + hermes::testing::Timer fwrite_timer; + hermes::testing::Timer fclose_timer; + hermes::testing::Timer compute_timer; + hermes::testing::Timer timer; + + for (int i = 0; i < options.num_iterations; ++i) { + std::string output_path = GetOutputPath(options.output_path, rank); + + if (options.direct_io) { + fopen_timer.resumeTime(); + int fd = open(output_path.c_str(), + O_WRONLY | O_CREAT | O_TRUNC | O_DIRECT | O_SYNC, + S_IRUSR | S_IWUSR); + fopen_timer.pauseTime(); + MPI_Barrier(MPI_COMM_WORLD); + + CHECK_GT(fd, 0); + fwrite_timer.resumeTime(); + DoWrite(x, MEGABYTES(options.data_size_mb), fd); + fwrite_timer.pauseTime(); + MPI_Barrier(MPI_COMM_WORLD); + + fclose_timer.resumeTime(); + CHECK_EQ(close(fd), 0); + fclose_timer.pauseTime(); + MPI_Barrier(MPI_COMM_WORLD); + } else { + fopen_timer.resumeTime(); + FILE *f = fopen(output_path.c_str(), "w"); + fopen_timer.pauseTime(); + CHECK(f); + MPI_Barrier(MPI_COMM_WORLD); + + size_t num_ios = options.data_size_mb / options.io_size_mb; + size_t byte_offset = 0; + size_t byte_increment = MEGABYTES(options.data_size_mb) / num_ios; + + for (size_t iter = 0; iter < num_ios; ++iter) { + auto sleep_seconds = std::chrono::seconds(options.sleep_seconds); + compute_timer.resumeTime(); + std::this_thread::sleep_for(sleep_seconds); + compute_timer.pauseTime(); + + fwrite_timer.resumeTime(); + void *write_start = (hermes::u8 *)x + byte_offset; + DoFwrite(write_start, byte_increment, f); + + if (options.sync) { + CHECK_EQ(fflush(f), 0); + CHECK_EQ(fsync(fileno(f)), 0); + } + fwrite_timer.pauseTime(); + byte_offset += byte_increment; + MPI_Barrier(MPI_COMM_WORLD); + } + + fclose_timer.resumeTime(); + CHECK_EQ(fclose(f), 0); + fclose_timer.pauseTime(); + MPI_Barrier(MPI_COMM_WORLD); + } + } + + result.fopen_time = fopen_timer.getElapsedTime(); + result.fwrite_time = fwrite_timer.getElapsedTime(); + result.fclose_time = fclose_timer.getElapsedTime(); + result.compute_time = compute_timer.getElapsedTime(); + result.total_time = timer.getElapsedTime(); + + return result; +} + +void CheckResults(float *data, size_t num_elements, + const std::string &results_base, int rank) { + std::string results_path = GetOutputPath(results_base, rank); + + FILE *f = fopen(results_path.c_str(), "r"); + CHECK(f); + + std::vector read_data(num_elements); + fread(read_data.data(), 1, num_elements * sizeof(float), f); + + for (size_t i = 0; i < num_elements; ++i) { + Assert(data[i] == read_data[i]); + } + + CHECK_EQ(fclose(f), 0); +} + +int main(int argc, char* argv[]) { + Options options = HandleArgs(argc, argv); + + MPI_Init(&argc, &argv); + + int rank = 0; + int comm_size = 0; + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + MPI_Comm_size(MPI_COMM_WORLD, &comm_size); + + size_t num_elements = MEGABYTES(options.data_size_mb) / sizeof(float); + const int kSectorSize = 512; + float *data = (float *)aligned_alloc(kSectorSize, + num_elements * sizeof(float)); + + for (size_t i = 0; i < num_elements; ++i) { + data[i] = uniform_random_number() * kXdim; + } + + Timing local_results = {}; + + if (options.do_posix_io) { + hermes::testing::Timer timer; + timer.resumeTime(); + local_results = RunPosixBench(options, data, rank); + local_results.total_time = timer.pauseTime(); + } else { + // RunHermesBench(options, data); + } + + if (options.verify_results) { + CheckResults(data, num_elements, options.output_path, rank); + } + + Timing combined_results = {}; + MPI_Reduce(&local_results.total_time, &combined_results.total_time, 1, + MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD); + MPI_Reduce(&local_results.fwrite_time, &combined_results.fwrite_time, 1, + MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD); + MPI_Reduce(&local_results.fopen_time, &combined_results.fopen_time, 1, + MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD); + MPI_Reduce(&local_results.fclose_time, &combined_results.fclose_time, 1, + MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD); + MPI_Reduce(&local_results.compute_time, &combined_results.compute_time, 1, + MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD); + + if (rank == 0) { + double total_mb = options.data_size_mb * options.num_iterations * comm_size; + double bandwidth_mbps = total_mb / combined_results.total_time; + PrintResults(options, combined_results, bandwidth_mbps); + } + + free(data); + + MPI_Finalize(); + + return 0; +} diff --git a/src/api/bucket.h b/src/api/bucket.h index b71b587a6..35a4c247e 100644 --- a/src/api/bucket.h +++ b/src/api/bucket.h @@ -103,20 +103,28 @@ class Bucket { * */ template - Status Put(std::vector &names, - std::vector> &blobs, const Context &ctx); + Status Put(const std::vector &names, + const std::vector> &blobs, const Context &ctx); /** * */ template - Status Put(std::vector &names, - std::vector> &blobs); + Status Put(const std::vector &names, + const std::vector> &blobs); /** * */ template + Status PutInternal(const std::vector &names, + const std::vector &sizes, + const std::vector> &blobs, + const Context &ctx); + /** + * + */ + template Status PlaceBlobs(std::vector &schemas, const std::vector> &blobs, const std::vector &names, const Context &ctx); @@ -234,16 +242,37 @@ Status Bucket::PlaceBlobs(std::vector &schemas, } template -Status Bucket::Put(std::vector &names, - std::vector> &blobs) { +Status Bucket::Put(const std::vector &names, + const std::vector> &blobs) { Status result = Put(names, blobs, ctx_); return result; } template -Status Bucket::Put(std::vector &names, - std::vector> &blobs, const Context &ctx) { +Status Bucket::PutInternal(const std::vector &names, + const std::vector &sizes, + const std::vector> &blobs, + const Context &ctx) { + std::vector schemas; + HERMES_BEGIN_TIMED_BLOCK("CalculatePlacement"); + Status result = CalculatePlacement(&hermes_->context_, &hermes_->rpc_, sizes, + schemas, ctx); + HERMES_END_TIMED_BLOCK(); + + if (result.Succeeded()) { + result = PlaceBlobs(schemas, blobs, names, ctx); + } else { + LOG(ERROR) << result.Msg(); + } + + return result; +} + +template +Status Bucket::Put(const std::vector &names, + const std::vector> &blobs, + const Context &ctx) { Status ret; for (auto &name : names) { @@ -266,17 +295,24 @@ Status Bucket::Put(std::vector &names, for (size_t i = 0; i < num_blobs; ++i) { sizes_in_bytes[i] = blobs[i].size() * sizeof(T); } - std::vector schemas; - HERMES_BEGIN_TIMED_BLOCK("CalculatePlacement"); - ret = CalculatePlacement(&hermes_->context_, &hermes_->rpc_, sizes_in_bytes, - schemas, ctx); - HERMES_END_TIMED_BLOCK(); - - if (ret.Succeeded()) { - ret = PlaceBlobs(schemas, blobs, names, ctx); + + if (ctx.rr_retry) { + int num_devices = + GetLocalSystemViewState(&hermes_->context_)->num_devices; + + for (int i = 0; i < num_devices; ++i) { + ret = PutInternal(names, sizes_in_bytes, blobs, ctx); + + if (ret.Failed()) { + RoundRobinState rr_state; + int current = rr_state.GetCurrentDeviceIndex(); + rr_state.SetCurrentDeviceIndex((current + 1) % num_devices); + } else { + break; + } + } } else { - LOG(ERROR) << ret.Msg(); - return ret; + ret = PutInternal(names, sizes_in_bytes, blobs, ctx); } } else { ret = INVALID_BUCKET; diff --git a/src/api/hermes.cc b/src/api/hermes.cc index 6744da4c0..14354b239 100644 --- a/src/api/hermes.cc +++ b/src/api/hermes.cc @@ -31,6 +31,7 @@ namespace api { int Context::default_buffer_organizer_retries; PlacementPolicy Context::default_placement_policy; +bool Context::default_rr_split; Status RenameBucket(const std::string &old_name, const std::string &new_name, @@ -311,6 +312,7 @@ std::shared_ptr InitHermes(Config *config, bool is_daemon, api::Context::default_buffer_organizer_retries = config->num_buffer_organizer_retries; api::Context::default_placement_policy = config->default_placement_policy; + api::Context::default_rr_split = config->default_rr_split; RoundRobinState::devices_.reserve(config->num_devices); for (DeviceID id = 0; id < config->num_devices; ++id) { diff --git a/src/buffer_organizer.cc b/src/buffer_organizer.cc index fcb1be89f..12bbc62c9 100644 --- a/src/buffer_organizer.cc +++ b/src/buffer_organizer.cc @@ -27,10 +27,6 @@ void LocalShutdownBufferOrganizer(SharedMemoryContext *context) { context->bo->pool.~ThreadPool(); } -void ShutdownBufferOrganizer(RpcContext *rpc) { - RpcCall(rpc, rpc->node_id, "BO::ShutdownBufferOrganizer"); -} - void BoMove(SharedMemoryContext *context, BufferID src, TargetID dest) { (void)context; printf("%s(%d, %d)\n", __func__, (int)src.as_int, (int)dest.as_int); @@ -243,11 +239,16 @@ void AwaitAsyncFlushingTasks(SharedMemoryContext *context, RpcContext *rpc, VBucketID id) { auto sleep_time = std::chrono::milliseconds(500); int outstanding_flushes = 0; + int log_every = 10; + int counter = 0; while ((outstanding_flushes = GetNumOutstandingFlushingTasks(context, rpc, id)) != 0) { - LOG(INFO) << "Waiting for " << outstanding_flushes - << " outstanding flushes" << std::endl; + if (++counter == log_every) { + LOG(INFO) << "Waiting for " << outstanding_flushes + << " outstanding flushes" << std::endl; + counter = 0; + } std::this_thread::sleep_for(sleep_time); } } diff --git a/src/buffer_organizer.h b/src/buffer_organizer.h index 52141ebba..e8172cf59 100644 --- a/src/buffer_organizer.h +++ b/src/buffer_organizer.h @@ -72,7 +72,6 @@ void BoDelete(SharedMemoryContext *context, BufferID src); void FlushBlob(SharedMemoryContext *context, RpcContext *rpc, BlobID blob_id, const std::string &filename, u64 offset, bool async = false); void LocalShutdownBufferOrganizer(SharedMemoryContext *context); -void ShutdownBufferOrganizer(RpcContext *rpc); void IncrementFlushCount(SharedMemoryContext *context, RpcContext *rpc, const std::string &vbkt_name); void DecrementFlushCount(SharedMemoryContext *context, RpcContext *rpc, diff --git a/src/buffer_pool.cc b/src/buffer_pool.cc index 173ca22ac..dae9b4366 100644 --- a/src/buffer_pool.cc +++ b/src/buffer_pool.cc @@ -1722,16 +1722,20 @@ api::Status PlaceBlob(SharedMemoryContext *context, RpcContext *rpc, AttachBlobToBucket(context, rpc, name.c_str(), bucket_id, buffer_ids, false, called_from_buffer_organizer); } else { - if (called_from_buffer_organizer) { + if (ctx.disable_swap) { result = PLACE_SWAP_BLOB_TO_BUF_FAILED; - LOG(ERROR) << result.Msg(); } else { - SwapBlob swap_blob = PutToSwap(context, rpc, name, bucket_id, blob.data, - blob.size); - result = BLOB_IN_SWAP_PLACE; - LOG(WARNING) << result.Msg(); - RpcCall(rpc, rpc->node_id, "BO::PlaceInHierarchy", swap_blob, name, - ctx); + if (called_from_buffer_organizer) { + result = PLACE_SWAP_BLOB_TO_BUF_FAILED; + LOG(ERROR) << result.Msg(); + } else { + SwapBlob swap_blob = PutToSwap(context, rpc, name, bucket_id, blob.data, + blob.size); + result = BLOB_IN_SWAP_PLACE; + LOG(WARNING) << result.Msg(); + RpcCall(rpc, rpc->node_id, "BO::PlaceInHierarchy", swap_blob, + name, ctx); + } } } diff --git a/src/config_parser.cc b/src/config_parser.cc index 699941cea..9f6ed57e1 100644 --- a/src/config_parser.cc +++ b/src/config_parser.cc @@ -85,6 +85,7 @@ enum ConfigVariable { ConfigVariable_PlacementPolicy, ConfigVariable_IsSharedDevice, ConfigVariable_BoNumThreads, + ConfigVariable_RRSplit, ConfigVariable_Count }; @@ -123,6 +124,7 @@ static const char *kConfigVariableStrings[ConfigVariable_Count] = { "default_placement_policy", "is_shared_device", "buffer_organizer_num_threads", + "default_rr_split", }; struct Token { @@ -843,6 +845,10 @@ void ParseTokens(TokenList *tokens, Config *config) { config->bo_num_threads = ParseInt(&tok); break; } + case ConfigVariable_RRSplit: { + config->default_rr_split = ParseInt(&tok); + break; + } default: { HERMES_INVALID_CODE_PATH; break; diff --git a/src/data_placement_engine.cc b/src/data_placement_engine.cc index fd1e86130..2faf6d9b5 100644 --- a/src/data_placement_engine.cc +++ b/src/data_placement_engine.cc @@ -148,10 +148,11 @@ void GetSplitSizes(size_t blob_size, std::vector &output) { output.push_back(blob_size - blob_each_portion*(split_num-1)); } -Status RoundRobinPlacement(std::vector &blob_sizes, +Status RoundRobinPlacement(const std::vector &blob_sizes, std::vector &node_state, std::vector &output, - const std::vector &targets) { + const std::vector &targets, + bool split) { Status result; std::vector ns_local(node_state.begin(), node_state.end()); @@ -160,8 +161,7 @@ Status RoundRobinPlacement(std::vector &blob_sizes, std::mt19937 rng(dev()); PlacementSchema schema; - // Split the blob - if (SplitBlob(blob_sizes[i])) { + if (split) { // Construct the vector for the splitted blob std::vector new_blob_size; GetSplitSizes(blob_sizes[i], new_blob_size); @@ -211,7 +211,7 @@ Status AddRandomSchema(std::multimap &ordered_cap, return result; } -Status RandomPlacement(std::vector &blob_sizes, +Status RandomPlacement(const std::vector &blob_sizes, std::multimap &ordered_cap, std::vector &output) { Status result; @@ -404,7 +404,7 @@ enum Topology { }; Status CalculatePlacement(SharedMemoryContext *context, RpcContext *rpc, - std::vector &blob_sizes, + const std::vector &blob_sizes, std::vector &output, const api::Context &api_context) { std::vector output_tmp; @@ -453,7 +453,7 @@ Status CalculatePlacement(SharedMemoryContext *context, RpcContext *rpc, } case api::PlacementPolicy::kRoundRobin: { result = RoundRobinPlacement(blob_sizes, node_state, - output_tmp, targets); + output_tmp, targets, api_context.rr_split); break; } case api::PlacementPolicy::kMinimizeIoTime: { diff --git a/src/data_placement_engine.h b/src/data_placement_engine.h index aa10e333a..d00fd4af6 100644 --- a/src/data_placement_engine.h +++ b/src/data_placement_engine.h @@ -40,12 +40,13 @@ class RoundRobinState { void SetCurrentDeviceIndex(int new_device_index); }; -Status RoundRobinPlacement(std::vector &blob_sizes, - std::vector &node_state, +Status RoundRobinPlacement(const std::vector &blob_sizes, + std::vector &node_state, std::vector &output, - const std::vector &targets); + const std::vector &targets, + bool split); -Status RandomPlacement(std::vector &blob_sizes, +Status RandomPlacement(const std::vector &blob_sizes, std::multimap &ordered_cap, std::vector &output); @@ -56,7 +57,7 @@ Status MinimizeIoTimePlacement(const std::vector &blob_sizes, std::vector &output); Status CalculatePlacement(SharedMemoryContext *context, RpcContext *rpc, - std::vector &blob_size, + const std::vector &blob_size, std::vector &output, const api::Context &api_context); diff --git a/src/hermes_types.h b/src/hermes_types.h index be8213f0b..6448b43ec 100644 --- a/src/hermes_types.h +++ b/src/hermes_types.h @@ -53,12 +53,19 @@ enum class PlacementPolicy { struct Context { static int default_buffer_organizer_retries; static PlacementPolicy default_placement_policy; + static bool default_rr_split; PlacementPolicy policy; int buffer_organizer_retries; + bool rr_split; + bool rr_retry; + bool disable_swap; Context() : policy(default_placement_policy), - buffer_organizer_retries(default_buffer_organizer_retries) {} + buffer_organizer_retries(default_buffer_organizer_retries), + rr_split(default_rr_split), + rr_retry(false), + disable_swap(false) {} }; } // namespace api @@ -183,6 +190,7 @@ struct Config { int rpc_num_threads; int bo_num_threads; api::PlacementPolicy default_placement_policy; + bool default_rr_split; /** A base name for the BufferPool shared memory segement. Hermes appends the * value of the USER environment variable to this string. diff --git a/src/metadata_management.h b/src/metadata_management.h index 217c9bc2e..7eb6906c0 100644 --- a/src/metadata_management.h +++ b/src/metadata_management.h @@ -432,6 +432,11 @@ bool LocalLockBlob(SharedMemoryContext *context, BlobID blob_id); * */ bool LocalUnlockBlob(SharedMemoryContext *context, BlobID blob_id); + +/** + * + */ +SystemViewState *GetLocalSystemViewState(SharedMemoryContext *context); } // namespace hermes #endif // HERMES_METADATA_MANAGEMENT_H_ diff --git a/src/metadata_management_internal.h b/src/metadata_management_internal.h index e0f19ec38..fcf5ea3fa 100644 --- a/src/metadata_management_internal.h +++ b/src/metadata_management_internal.h @@ -77,7 +77,6 @@ void LocalDelete(MetadataManager *mdm, const char *key, MapType map_type); u64 LocalGetRemainingTargetCapacity(SharedMemoryContext *context, TargetID id); void LocalUpdateGlobalSystemViewState(SharedMemoryContext *context, std::vector adjustments); -SystemViewState *GetLocalSystemViewState(SharedMemoryContext *context); SystemViewState *GetGlobalSystemViewState(SharedMemoryContext *context); std::vector LocalGetGlobalDeviceCapacities(SharedMemoryContext *context); std::vector GetGlobalDeviceCapacities(SharedMemoryContext *context, diff --git a/src/rpc_thallium.cc b/src/rpc_thallium.cc index 6c25bea83..5f2e1dc0f 100644 --- a/src/rpc_thallium.cc +++ b/src/rpc_thallium.cc @@ -561,18 +561,12 @@ void StartBufferOrganizer(SharedMemoryContext *context, RpcContext *rpc, req.respond(result); }; - auto rpc_shutdown_buffer_organizer = [context](const tl::request &req) { - LocalShutdownBufferOrganizer(context); - req.respond(true); - }; - rpc_server->define("PlaceInHierarchy", rpc_place_in_hierarchy).disable_response(); rpc_server->define("MoveToTarget", rpc_move_to_target).disable_response(); rpc_server->define("EnqueueBoTask", rpc_enqueue_bo_task); rpc_server->define("EnqueueFlushingTask", rpc_enqueue_flushing_task); - rpc_server->define("ShutdownBufferOrganizer", rpc_shutdown_buffer_organizer); } void StartGlobalSystemViewStateUpdateThread(SharedMemoryContext *context, @@ -703,7 +697,7 @@ void RunDaemon(SharedMemoryContext *context, RpcContext *rpc, state->bo_engine->wait_for_finalize(); state->engine->wait_for_finalize(); - ShutdownBufferOrganizer(rpc); + LocalShutdownBufferOrganizer(context); delete state->engine; delete state->bo_engine; ReleaseSharedMemoryContext(context); diff --git a/src/utils.cc b/src/utils.cc index 962e0a4b9..beabefea3 100644 --- a/src/utils.cc +++ b/src/utils.cc @@ -119,6 +119,7 @@ void InitDefaultConfig(Config *config) { config->is_shared_device[3] = 1; config->bo_num_threads = 4; + config->default_rr_split = false; } void FailedLibraryCall(std::string func) { diff --git a/test/config_parser_test.cc b/test/config_parser_test.cc index 716e0e729..b559b25f1 100644 --- a/test/config_parser_test.cc +++ b/test/config_parser_test.cc @@ -103,5 +103,7 @@ int main(int argc, char **argv) { Assert(config.bo_num_threads == 4); + Assert(config.default_rr_split == false); + return 0; } diff --git a/test/data/hermes.conf b/test/data/hermes.conf index 79cad374f..18eb8afa2 100644 --- a/test/data/hermes.conf +++ b/test/data/hermes.conf @@ -106,3 +106,7 @@ buffer_pool_shmem_name = "/hermes_buffer_pool_"; # Choose Random, RoundRobin, or MinimizeIoTime default_placement_policy = "MinimizeIoTime"; + +# If true (1) the RoundRobin placement policy algorithm will split each Blob +# into a random number of smaller Blobs. +default_rr_split = 0; diff --git a/test/dpe_roundrobin_test.cc b/test/dpe_roundrobin_test.cc index d2879e7e3..229241649 100644 --- a/test/dpe_roundrobin_test.cc +++ b/test/dpe_roundrobin_test.cc @@ -32,7 +32,7 @@ void RoundRobinPlaceBlob(std::vector &blob_sizes, std::vector targets = testing::GetDefaultTargets(node_state.num_devices); Status result = RoundRobinPlacement(blob_sizes, node_state.bytes_available, - schemas_tmp, targets); + schemas_tmp, targets, false); if (!result.Succeeded()) { std::cout << "\nRoundRobinPlacement failed\n" << std::flush; exit(1);