diff --git a/adapter/test/pubsub/pubsub_topic_test.cc b/adapter/test/pubsub/pubsub_topic_test.cc index ad818dc65..aa8de63ac 100644 --- a/adapter/test/pubsub/pubsub_topic_test.cc +++ b/adapter/test/pubsub/pubsub_topic_test.cc @@ -34,4 +34,8 @@ int main(int argc, char **argv) { auto disconnect_ret = hermes::pubsub::disconnect(); Assert(disconnect_ret.Succeeded()); + + MPI_Finalize(); + + return 0; } diff --git a/adapter/test/stdio/CMakeLists.txt b/adapter/test/stdio/CMakeLists.txt index ffec2d182..bb03f38cb 100644 --- a/adapter/test/stdio/CMakeLists.txt +++ b/adapter/test/stdio/CMakeLists.txt @@ -55,8 +55,11 @@ add_executable(hermes_stdio_low_buf_adapter_test stdio_adapter_low_buffer_space_ target_link_libraries(hermes_stdio_low_buf_adapter_test hermes_stdio) add_dependencies(hermes_stdio_low_buf_adapter_test hermes_stdio hermes_daemon) set_target_properties(hermes_stdio_low_buf_adapter_test PROPERTIES COMPILE_FLAGS "-DHERMES_INTERCEPT=1") -gcc_hermes(hermes_stdio_low_buf_adapter_test "" "" hermes_small "") -gcc_hermes(hermes_stdio_low_buf_adapter_test "" "" hermes_small async) +# TODO: The DPE doesn't respect available buffering space. In this test, it +# gives out over 1 MiB of RAM even though the RAM tier only has ~756 KiB +# available. See issue #439. +# gcc_hermes(hermes_stdio_low_buf_adapter_test "" "" hermes_small "") +# gcc_hermes(hermes_stdio_low_buf_adapter_test "" "" hermes_small async) add_executable(hermes_stdio_adapter_mode_test stdio_adapter_mode_test.cpp ${ADAPTER_COMMON}) target_link_libraries(hermes_stdio_adapter_mode_test hermes_stdio) diff --git a/benchmarks/CMakeLists.txt b/benchmarks/CMakeLists.txt index 7abc369b8..57ae9c3ff 100644 --- a/benchmarks/CMakeLists.txt +++ b/benchmarks/CMakeLists.txt @@ -3,7 +3,7 @@ include_directories( ${PROJECT_SOURCE_DIR}/test ) -set(BENCHMARKS mdm_bench dpe_bench vpic_bench) +set(BENCHMARKS mdm_bench dpe_bench vpic_bench borg_bench) foreach(benchmark ${BENCHMARKS}) add_executable(${benchmark} ${benchmark}.cc) diff --git a/benchmarks/borg_bench.cc b/benchmarks/borg_bench.cc new file mode 100644 index 000000000..6e8516f22 --- /dev/null +++ b/benchmarks/borg_bench.cc @@ -0,0 +1,515 @@ +/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * + * Distributed under BSD 3-Clause license. * + * Copyright by The HDF Group. * + * Copyright by the Illinois Institute of Technology. * + * All rights reserved. * + * * + * This file is part of Hermes. The full Hermes copyright notice, including * + * terms governing use, modification, and redistribution, is contained in * + * the COPYING file, which can be found at the top directory. If you do not * + * have access to the file, you may request a copy from help@hdfgroup.org. * + * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ + +#include +#include +#include + +#include + +#include + +#include "hermes.h" +#include "bucket.h" +#include "vbucket.h" +#include "metadata_management_internal.h" +#include "test_utils.h" + +namespace hapi = hermes::api; +using HermesPtr = std::shared_ptr; + +const int kDefaultIters = 2000; +const size_t kDefaultBlobSize = KILOBYTES(32); + +struct Options { + bool use_borg; + bool verify; + bool time_puts; + bool verbose; + bool debug; + bool write_only; + bool mixed; + long sleep_ms; + size_t blob_size; + int iters; + char *output_filename; +}; + +static void PrintUsage(char *program) { + fprintf(stderr, "Usage: %s [-bdmpvwx] [-f ]\n", program); + fprintf(stderr, " -b\n"); + fprintf(stderr, " Enable the BORG for the write-only case.\n"); + fprintf(stderr, " -d\n"); + fprintf(stderr, " If present, enable MPI breakpoint for debugging.\n"); + fprintf(stderr, " -f\n"); + fprintf(stderr, " The filename of the persisted data (for correctness" + " verification).\n"); + fprintf(stderr, " -i\n"); + fprintf(stderr, " Number of iterations (default: %d)\n", kDefaultIters); + fprintf(stderr, " -m\n"); + fprintf(stderr, " Run mixed workload.\n"); + fprintf(stderr, " -p\n"); + fprintf(stderr, " Get average for groups of puts.\n"); + fprintf(stderr, " -s\n"); + fprintf(stderr, " Sleep ms between each Put.\n"); + fprintf(stderr, " -v\n"); + fprintf(stderr, " Print verbose information.\n"); + fprintf(stderr, " -w\n"); + fprintf(stderr, " Run write only workload.\n"); + fprintf(stderr, " -x\n"); + fprintf(stderr, " If present, verify results at the end.\n"); + fprintf(stderr, " -z\n"); + fprintf(stderr, " Blob size in bytes (default: %zu).\n", kDefaultBlobSize); +} + +static Options HandleArgs(int argc, char **argv) { + Options result = {}; + result.iters = kDefaultIters; + result.blob_size = kDefaultBlobSize; + + int option = -1; + while ((option = getopt(argc, argv, "bdf:hi:mps:vxwz:")) != -1) { + switch (option) { + case 'h': { + PrintUsage(argv[0]); + exit(0); + } + case 'b': { + result.use_borg = true; + break; + } + case 'd': { + result.debug = true; + break; + } + case 'f': { + result.output_filename = optarg; + break; + } + case 'i': { + result.iters = strtol(optarg, NULL, 0); + break; + } + case 'm': { + result.mixed = true; + break; + } + case 'p': { + result.time_puts = true; + break; + } + case 's': { + result.sleep_ms = strtol(optarg, NULL, 0); + break; + } + case 'v': { + result.verbose = true; + break; + } + case 'x': { + result.verify = true; + break; + } + case 'w': { + result.write_only = true; + break; + } + case 'z': { + result.blob_size = strtoll(optarg, NULL, 0); + break; + } + default: { + PrintUsage(argv[0]); + exit(1); + } + } + } + + if (result.verify && !result.output_filename) { + fprintf(stderr, "Please supply filename via -f\n"); + exit(1); + } + if (optind < argc) { + fprintf(stderr, "non-option ARGV-elements: "); + while (optind < argc) { + fprintf(stderr, "%s ", argv[optind++]); + } + fprintf(stderr, "\n"); + } + return result; +} + +static double GetMPIAverage(double rank_seconds, int num_ranks, MPI_Comm comm) { + double total_secs = 0; + MPI_Reduce(&rank_seconds, &total_secs, 1, MPI_DOUBLE, MPI_SUM, 0, comm); + double result = total_secs / num_ranks; + + return result; +} + +static double GetBandwidth(double total_elapsed, double total_mb, MPI_Comm comm, + int ranks) { + double avg_total_seconds = GetMPIAverage(total_elapsed, ranks, comm); + double result = total_mb / avg_total_seconds; + + return result; +} + +static std::string MakeBlobName(int rank, int i) { + std::string result = std::to_string(rank) + "_" + std::to_string(i); + + return result; +} + +static void WriteOnlyWorkload(const Options &options) { + HermesPtr hermes = hapi::InitHermes(getenv("HERMES_CONF")); + + if (hermes->IsApplicationCore()) { + int rank = hermes->GetProcessRank(); + const int kNumRanks = hermes->GetNumProcesses(); + const size_t kTotalBytes = kNumRanks * options.blob_size * options.iters; + + hermes::testing::Timer timer; + hapi::Context ctx; + // Disable swapping of Blobs + ctx.disable_swap = true; + ctx.policy = hapi::PlacementPolicy::kMinimizeIoTime; + + std::string bkt_name = "BORG_" + std::to_string(rank); + hapi::VBucket vbkt(bkt_name, hermes); + hapi::Bucket bkt(bkt_name, hermes, ctx); + + hapi::WriteOnlyTrait trait; + if (options.use_borg) { + vbkt.Attach(&trait); + } + + // MinIoTime with retry + const int kReportFrequency = 30; + hermes::testing::Timer put_timer; + size_t failed_puts = 0; + size_t failed_links = 0; + size_t retries = 0; + for (int i = 0; i < options.iters; ++i) { + std::string blob_name = MakeBlobName(rank, i); + hapi::Blob blob(options.blob_size, i % 255); + + timer.resumeTime(); + put_timer.resumeTime(); + hapi::Status status; + int consecutive_fails = 0; + while (!((status = bkt.Put(blob_name, blob)).Succeeded())) { + retries++; + if (++consecutive_fails > 10) { + failed_puts++; + break; + } + } + + if (options.use_borg && consecutive_fails <= 10) { + hapi::Status link_status = vbkt.Link(blob_name, bkt_name); + if (!link_status.Succeeded()) { + failed_links++; + } + } + + if (options.sleep_ms > 0 && i > 0 && i % kReportFrequency == 0) { + std::this_thread::sleep_for( + std::chrono::milliseconds(options.sleep_ms)); + } + + put_timer.pauseTime(); + timer.pauseTime(); + + if (options.time_puts && i > 0 && i % kReportFrequency == 0) { + Assert(kNumRanks == 1); + double total_mb = + (options.blob_size * kReportFrequency) / 1024.0 / 1024.0; + + std::cout << i << ", " << total_mb / put_timer.getElapsedTime() << "\n"; + } + hermes->AppBarrier(); + } + + Assert(failed_puts == 0); + if (options.verbose) { + std::cout << "Rank " << rank << " failed puts: " << failed_puts << "\n"; + std::cout << "Rank " << rank << " failed links: " << failed_links << "\n"; + std::cout << "Rank " << rank << " Put retries: " << retries << "\n"; + } + + hermes->AppBarrier(); + if (!hermes->IsFirstRankOnNode()) { + vbkt.Release(); + bkt.Release(); + } + + hermes->AppBarrier(); + if (hermes->IsFirstRankOnNode()) { + vbkt.Destroy(); + if (options.verify) { + hapi::VBucket file_vbucket(options.output_filename, hermes); + auto offset_map = std::unordered_map(); + + for (int i = 0; i < kNumRanks; ++i) { + for (int j = 0; j < options.iters; ++j) { + std::string blob_name = MakeBlobName(i, j); + file_vbucket.Link(blob_name, bkt_name, ctx); + const size_t kBytesPerRank = options.iters * options.blob_size; + size_t offset = (i * kBytesPerRank) + (j * options.blob_size); + offset_map.emplace(blob_name, offset); + } + } + bool flush_synchronously = true; + hapi::PersistTrait persist_trait(options.output_filename, offset_map, + flush_synchronously); + if (options.verbose) { + std::cout << "Flushing buffers...\n"; + } + file_vbucket.Attach(&persist_trait); + + file_vbucket.Destroy(); + } + bkt.Destroy(); + } + + hermes->AppBarrier(); + + MPI_Comm *comm = (MPI_Comm *)hermes->GetAppCommunicator(); + double total_mb = kTotalBytes / 1024.0 / 1024.0; + double bandwidth = GetBandwidth(timer.getElapsedTime(), total_mb, *comm, + kNumRanks); + + if (hermes->IsFirstRankOnNode()) { + std::cout << bandwidth << "," << kNumRanks << "," << options.use_borg + << "," << options.sleep_ms << "\n"; + } + } + + hermes->Finalize(); +} + +static void OptimizeReads(Options &options) { + HermesPtr hermes = hapi::InitHermes(getenv("HERMES_CONF")); + if (options.sleep_ms == 0) { + options.sleep_ms = 3000; + } + + if (hermes->IsApplicationCore()) { + // Optimize reads + // Fill hierarchy + // Delete all RAM Blobs + // BORG moves BB Blobs to RAM + // Read all BB Blobs at RAM BW + + using namespace hermes; // NOLINT(*) + + int rank = hermes->GetProcessRank(); + const int kNumRanks = hermes->GetNumProcesses(); + // const size_t kTotalBytes = kNumRanks * options.blob_size * options.iters; + MetadataManager *mdm = GetMetadataManagerFromContext(&hermes->context_); + std::vector targets(mdm->node_targets.length); + + for (u16 i = 0; i < mdm->node_targets.length; ++i) { + targets[i] = {1, i, i}; + } + + GlobalSystemViewState *gsvs = GetGlobalSystemViewState(&hermes->context_); + f32 ram_min_threshold = gsvs->bo_capacity_thresholds[0].min; + f32 nvme_min_threshold = gsvs->bo_capacity_thresholds[1].min; + + std::vector capacities = + GetRemainingTargetCapacities(&hermes->context_, &hermes->rpc_, targets); + + // See how many blobs we can fit in each Target + std::vector blobs_per_target(capacities.size()); + for (size_t i = 0; i < blobs_per_target.size(); ++i) { + blobs_per_target[i] = capacities[i] / options.blob_size; + } + + hermes::testing::Timer timer; + hapi::Context ctx; + // Disable swapping of Blobs + ctx.disable_swap = true; + ctx.policy = hapi::PlacementPolicy::kMinimizeIoTime; + + std::string bkt_name = __func__ + std::to_string(rank); + hapi::Bucket bkt(bkt_name, hermes, ctx); + + // MinIoTime with retry + hermes::testing::Timer put_timer; + size_t failed_puts = 0; + size_t retries = 0; + + // Fill hierarchy + for (size_t target_idx = 0; target_idx < blobs_per_target.size(); + ++target_idx) { + for (int i = 0; i < blobs_per_target[target_idx]; ++i) { + std::string blob_name = (std::to_string(rank) + "_" + + std::to_string(target_idx) + "_" + + std::to_string(i)); + hapi::Blob blob(options.blob_size, i % 255); + + hapi::Status status; + int consecutive_fails = 0; + + while (!((status = bkt.Put(blob_name, blob)).Succeeded())) { + retries++; + if (++consecutive_fails > 10) { + failed_puts++; + break; + } + } + } + hermes->AppBarrier(); + } + + Assert(failed_puts == 0); + if (options.verbose) { + std::cout << "Rank " << rank << " failed puts: " << failed_puts << "\n"; + std::cout << "Rank " << rank << " Put retries: " << retries << "\n"; + } + + // Delete all RAM and NVMe Blobs + for (size_t j = 0; j < blobs_per_target.size() - 1; ++j) { + for (int i = 0; i < blobs_per_target[j]; ++i) { + std::string blob_name = (std::to_string(rank) + "_" + + std::to_string(j) + "_" + + std::to_string(i)); + Assert(bkt.DeleteBlob(blob_name).Succeeded()); + } + } + + // Give the BORG time to move BB Blobs to RAM and NVMe + std::this_thread::sleep_for(std::chrono::milliseconds(options.sleep_ms)); + + // Read all BB Blobs at RAM and NVMe BW + const int kBbIndex = 2; + + int blobs_to_read = blobs_per_target[kBbIndex]; + if (ram_min_threshold > 0) { + blobs_to_read = (ram_min_threshold * blobs_per_target[0] + + nvme_min_threshold * blobs_per_target[1]); + } + int stopping_index = blobs_per_target[kBbIndex] - blobs_to_read; + for (int i = blobs_per_target[kBbIndex] - 1; i > stopping_index; --i) { + std::string blob_name = (std::to_string(rank) + "_" + + std::to_string(kBbIndex) + "_" + + std::to_string(i)); + + hapi::Blob blob(options.blob_size); + timer.resumeTime(); + Assert(bkt.Get(blob_name, blob) == options.blob_size); + timer.pauseTime(); + + // Verify + hapi::Blob expected_blob(options.blob_size, i % 255); + Assert(blob == expected_blob); + } + + if (!hermes->IsFirstRankOnNode()) { + bkt.Release(); + } + hermes->AppBarrier(); + if (hermes->IsFirstRankOnNode()) { + bkt.Destroy(); + } + hermes->AppBarrier(); + + MPI_Comm *comm = (MPI_Comm *)hermes->GetAppCommunicator(); + size_t bytes_read = blobs_per_target[kBbIndex] * options.blob_size; + double total_mb = bytes_read / 1024.0 / 1024.0; + double bandwidth = GetBandwidth(timer.getElapsedTime(), total_mb, *comm, + kNumRanks); + + if (hermes->IsFirstRankOnNode()) { + std::cout << bandwidth << "," << kNumRanks << "," << options.use_borg + << "," << options.sleep_ms << "\n"; + } + } + + hermes->Finalize(); +} + +static void Verify(const Options &options) { + int my_rank; + int comm_size; + MPI_Comm_rank(MPI_COMM_WORLD, &my_rank); + MPI_Comm_size(MPI_COMM_WORLD, &comm_size); + + const size_t kAppCores = comm_size - 1; + const size_t kTotalBytes = kAppCores * options.iters * options.blob_size; + if (my_rank == 0) { + std::vector data(kTotalBytes); + + if (options.verbose) { + std::cout << "Verifying data\n"; + } + + FILE *f = fopen(options.output_filename, "r"); + Assert(f); + Assert(fseek(f, 0L, SEEK_END) == 0); + size_t file_size = ftell(f); + Assert(file_size == kTotalBytes); + Assert(fseek(f, 0L, SEEK_SET) == 0); + size_t result = fread(data.data(), kTotalBytes, 1, f); + Assert(result == 1); + + for (size_t rank = 0; rank < kAppCores; ++rank) { + for (int iter = 0; iter < options.iters; ++iter) { + for (size_t byte = 0; byte < options.blob_size; ++byte) { + Assert(data[(rank * options.iters * options.blob_size) + + (iter * options.blob_size) + byte] == iter % 255); + } + } + } + } +} + +static void DebugBreak() { + int gdb_iii = 0; + char gdb_DEBUG_hostname[256]; + gethostname(gdb_DEBUG_hostname, sizeof(gdb_DEBUG_hostname)); + printf("PID %d on %s ready for attach\n", getpid(), gdb_DEBUG_hostname); + fflush(stdout); + while (0 == gdb_iii) + sleep(5); +} + +int main(int argc, char *argv[]) { + Options options = HandleArgs(argc, argv); + + int mpi_threads_provided; + MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &mpi_threads_provided); + if (mpi_threads_provided < MPI_THREAD_MULTIPLE) { + fprintf(stderr, "Didn't receive appropriate MPI threading specification\n"); + return 1; + } + + if (options.debug) { + DebugBreak(); + } + + if (options.write_only) { + WriteOnlyWorkload(options); + } + if (options.mixed) { + OptimizeReads(options); + } + if (options.verify) { + Verify(options); + } + + MPI_Finalize(); + + return 0; +} diff --git a/benchmarks/dpe_bench.cc b/benchmarks/dpe_bench.cc index 75c6017d6..50d88faaf 100644 --- a/benchmarks/dpe_bench.cc +++ b/benchmarks/dpe_bench.cc @@ -17,6 +17,7 @@ #include "hermes.h" #include "utils.h" +#include "test_utils.h" #include "data_placement_engine.h" /* example usage: ./bin/dpe_bench -m -s 4096 */ @@ -28,6 +29,7 @@ const auto now = std::chrono::high_resolution_clock::now; const u64 dpe_total_targets = 10; const size_t dpe_total_num_blobs = 10; const size_t dpe_total_blob_size = GIGABYTES(10); +const size_t kDefaultBlobSize = KILOBYTES(4); void PrintUsage(char *program) { fprintf(stderr, "Usage %s [-r]\n", program); @@ -54,7 +56,7 @@ int main(int argc, char **argv) { bool fixed_total_num_blobs {true}, fixed_total_blob_size {false}; int option = -1; char *rvalue = NULL; - size_t each_blob_size; + size_t each_blob_size = kDefaultBlobSize; size_t total_placed_size; double dpe_seconds; api::Status result; @@ -84,7 +86,7 @@ int main(int argc, char **argv) { PrintUsage(argv[0]); policy = api::PlacementPolicy::kRandom; fixed_total_blob_size = true; - each_blob_size = 4096; + each_blob_size = kDefaultBlobSize; std::cout << "Using Random policy for data placement engine.\n" << "Using fixed number of blobs of size 4KB for test.\n\n"; } @@ -174,7 +176,7 @@ int main(int argc, char **argv) { for (auto schema : output_tmp) { placed_size += testing::UpdateDeviceState(schema, tgt_state); } - assert(placed_size == total_placed_size); + Assert(placed_size == total_placed_size); // Aggregate placement schemas from the same target if (result.Succeeded()) { diff --git a/benchmarks/vpic_bench.cc b/benchmarks/vpic_bench.cc index b6283ee30..b83a04969 100644 --- a/benchmarks/vpic_bench.cc +++ b/benchmarks/vpic_bench.cc @@ -489,7 +489,9 @@ void CheckResults(float *data, size_t num_elements, CHECK(f); std::vector read_data(num_elements); - fread(read_data.data(), 1, num_elements * sizeof(float), f); + size_t num_bytes = num_elements * sizeof(float); + size_t bytes_read = fread(read_data.data(), 1, num_bytes, f); + Assert(bytes_read == num_bytes); for (size_t i = 0; i < num_elements; ++i) { Assert(data[i] == read_data[i]); diff --git a/src/api/hermes.cc b/src/api/hermes.cc index 1fe2f6ce0..d53615f38 100644 --- a/src/api/hermes.cc +++ b/src/api/hermes.cc @@ -249,14 +249,14 @@ SharedMemoryContext InitHermesCore(Config *config, CommunicationContext *comm, mdm->host_names_offset = (u8 *)rpc->host_names - (u8 *)shmem_base; } else { rpc->host_numbers = PushArray(&arenas[kArenaType_MetaData], - config->host_numbers.size()); - for (size_t i = 0; i < config->host_numbers.size(); ++i) { + rpc->num_host_numbers); + for (size_t i = 0; i < rpc->num_host_numbers; ++i) { rpc->host_numbers[i] = config->host_numbers[i]; } mdm->host_numbers_offset = (u8 *)rpc->host_numbers - (u8 *)shmem_base; } - InitMetadataManager(mdm, &arenas[kArenaType_MetaData], config, comm->node_id); + InitMetadataManager(mdm, rpc, &arenas[kArenaType_MetaData], config); InitMetadataStorage(&context, mdm, &arenas[kArenaType_MetaData], config); ShmemClientInfo *client_info = (ShmemClientInfo *)shmem_base; @@ -381,8 +381,7 @@ std::shared_ptr InitHermes(Config *config, bool is_daemon, double sleep_ms = config->system_view_state_update_interval_ms; StartGlobalSystemViewStateUpdateThread(&result->context_, &result->rpc_, - &result->trans_arena_, - sleep_ms); + &result->trans_arena_, sleep_ms); } WorldBarrier(&comm); diff --git a/src/api/traits.cc b/src/api/traits.cc index 71b9b796a..e607c130b 100644 --- a/src/api/traits.cc +++ b/src/api/traits.cc @@ -15,10 +15,12 @@ #include #include "buffer_organizer.h" +#include "metadata_management_internal.h" namespace hermes { namespace api { -Trait::Trait(TraitID id, TraitIdArray conflict_traits, TraitType type) +Trait::Trait(TraitID id, const std::vector &conflict_traits, + TraitType type) : id(id), conflict_traits(conflict_traits), type(type), @@ -32,20 +34,20 @@ using OffsetMap = std::unordered_map; PersistTrait::PersistTrait(const std::string &filename, const OffsetMap &offset_map, bool synchronous) - : Trait(HERMES_PERSIST_TRAIT, TraitIdArray(), TraitType::PERSIST), + : Trait(HERMES_PERSIST_TRAIT, std::vector(), TraitType::PERSIST), filename(filename), offset_map(offset_map), synchronous(synchronous) { - this->onAttachFn = std::bind(&PersistTrait::onAttach, this, - std::placeholders::_1, std::placeholders::_2, - std::placeholders::_3); - this->onDetachFn = std::bind(&PersistTrait::onDetach, this, - std::placeholders::_1, std::placeholders::_2, - std::placeholders::_3); - this->onLinkFn = std::bind(&PersistTrait::onLink, this, - std::placeholders::_1, std::placeholders::_2, - std::placeholders::_3); - this->onUnlinkFn = std::bind(&PersistTrait::onUnlink, this, - std::placeholders::_1, std::placeholders::_2, - std::placeholders::_3); + onAttachFn = std::bind(&PersistTrait::onAttach, this, + std::placeholders::_1, std::placeholders::_2, + std::placeholders::_3); + onDetachFn = std::bind(&PersistTrait::onDetach, this, + std::placeholders::_1, std::placeholders::_2, + std::placeholders::_3); + onLinkFn = std::bind(&PersistTrait::onLink, this, + std::placeholders::_1, std::placeholders::_2, + std::placeholders::_3); + onUnlinkFn = std::bind(&PersistTrait::onUnlink, this, + std::placeholders::_1, std::placeholders::_2, + std::placeholders::_3); } void PersistTrait::onAttach(HermesPtr hermes, VBucketID id, Trait *trait) { @@ -95,5 +97,52 @@ void PersistTrait::onUnlink(HermesPtr hermes, TraitInput &input, Trait *trait) { (void)trait; } +WriteOnlyTrait::WriteOnlyTrait() + : Trait(HERMES_WRITE_ONLY_TRAIT, std::vector(), TraitType::META) { + onAttachFn = std::bind(&WriteOnlyTrait::onAttach, this, + std::placeholders::_1, std::placeholders::_2, + std::placeholders::_3); + onDetachFn = std::bind(&WriteOnlyTrait::onDetach, this, + std::placeholders::_1, std::placeholders::_2, + std::placeholders::_3); + onLinkFn = std::bind(&WriteOnlyTrait::onLink, this, + std::placeholders::_1, std::placeholders::_2, + std::placeholders::_3); + onUnlinkFn = std::bind(&WriteOnlyTrait::onUnlink, this, + std::placeholders::_1, std::placeholders::_2, + std::placeholders::_3); +} + +void WriteOnlyTrait::onAttach(HermesPtr hermes, VBucketID id, Trait *trait) { + (void)hermes; + (void)id; + (void)trait; +} + +void WriteOnlyTrait::onDetach(HermesPtr hermes, VBucketID id, Trait *trait) { + (void)hermes; + (void)id; + (void)trait; +} + +void WriteOnlyTrait::onLink(HermesPtr hermes, TraitInput &input, Trait *trait) { + (void)trait; + + SharedMemoryContext *context = &hermes->context_; + RpcContext *rpc = &hermes->rpc_; + BucketID bucket_id = GetBucketId(context, rpc, input.bucket_name.c_str()); + f32 epsilon = 0.1; + f32 custom_importance = 0; + hermes::OrganizeBlob(context, rpc, bucket_id, input.blob_name, epsilon, + custom_importance); +} + +void WriteOnlyTrait::onUnlink(HermesPtr hermes, TraitInput &input, + Trait *trait) { + (void)hermes; + (void)input; + (void)trait; +} + } // namespace api } // namespace hermes diff --git a/src/api/traits.h b/src/api/traits.h index 48bc567fc..5006a1179 100644 --- a/src/api/traits.h +++ b/src/api/traits.h @@ -21,6 +21,9 @@ namespace hermes { namespace api { +#define HERMES_PERSIST_TRAIT 11 +#define HERMES_WRITE_ONLY_TRAIT 12 + /** A blob's hosting bucket and blob names */ struct BlobInfo { /** The blob-hosting bucket name */ @@ -43,7 +46,7 @@ struct Trait { /** The trait's ID */ TraitID id; /** \todo ??? */ - TraitIdArray conflict_traits; + std::vector conflict_traits; /** The trait's type */ TraitType type; /** Callback for trait->vbucket attach events */ @@ -54,13 +57,14 @@ struct Trait { OnLinkCallback onLinkFn; /** Callback for blob- &conflict_traits, + TraitType type); }; -#define HERMES_PERSIST_TRAIT 11 - /** (File) Persistence trait */ struct PersistTrait : public Trait { std::string filename; @@ -74,8 +78,18 @@ struct PersistTrait : public Trait { void onAttach(HermesPtr hermes, VBucketID id, Trait *trait); void onDetach(HermesPtr hermes, VBucketID id, Trait *trait); - void onLink(HermesPtr hermes, TraitInput &blob, Trait *trait); - void onUnlink(HermesPtr hermes, TraitInput &blob, Trait *trait); + void onLink(HermesPtr hermes, TraitInput &input, Trait *trait); + void onUnlink(HermesPtr hermes, TraitInput &input, Trait *trait); +}; + +struct WriteOnlyTrait : public Trait { + WriteOnlyTrait(); + + void onAttach(HermesPtr hermes, VBucketID id, Trait *trait); + void onDetach(HermesPtr hermes, VBucketID id, Trait *trait); + void onLink(HermesPtr hermes, TraitInput &input, Trait *trait); + void onUnlink(HermesPtr hermes, TraitInput &input, Trait *trait); + void onGet(HermesPtr hermes, TraitInput &input, Trait *trait); }; } // namespace api diff --git a/src/api/vbucket.cc b/src/api/vbucket.cc index df375184b..f567d7c2d 100644 --- a/src/api/vbucket.cc +++ b/src/api/vbucket.cc @@ -135,16 +135,40 @@ bool VBucket::ContainsBlob(std::string blob_name, std::string bucket_name) { return ret; } -Blob& VBucket::GetBlob(std::string blob_name, std::string bucket_name) { - LOG(INFO) << "Retrieving blob " << blob_name << " from bucket " << bucket_name - << " in VBucket " << name_ << '\n'; - hermes::api::Context ctx; - Bucket bkt(bucket_name, hermes_, ctx); - local_blob = {}; - size_t blob_size = bkt.Get(blob_name, local_blob, ctx); - local_blob.resize(blob_size); - bkt.Get(blob_name, local_blob, ctx); - return local_blob; +size_t VBucket::Get(const std::string &name, Bucket &bkt, Blob &user_blob, + const Context &ctx) { + size_t ret = Get(name, bkt, user_blob.data(), user_blob.size(), ctx); + + return ret; +} + +size_t VBucket::Get(const std::string &name, Bucket &bkt, Blob &user_blob) { + size_t result = Get(name, bkt, user_blob, ctx_); + + return result; +} + +size_t VBucket::Get(const std::string &name, Bucket &bkt, void *user_blob, + size_t blob_size, const Context &ctx) { + bool is_size_query = false; + if (blob_size != 0) { + is_size_query = true; + } + + size_t result = bkt.Get(name, user_blob, blob_size, ctx); + + if (!is_size_query) { + TraitInput input; + input.blob_name = name; + input.bucket_name = bkt.GetName(); + for (const auto& t : attached_traits_) { + if (t->onGetFn != nullptr) { + t->onGetFn(hermes_, input, t); + } + } + } + + return result; } std::vector VBucket::GetLinks(Context& ctx) { @@ -292,7 +316,7 @@ Status VBucket::Destroy(Context& ctx) { Status result; if (IsValid()) { - // NOTE(chogan): Let all flusing tasks complete before destroying the + // NOTE(chogan): Let all flushing tasks complete before destroying the // VBucket. WaitForBackgroundFlush(); @@ -308,11 +332,13 @@ Status VBucket::Destroy(Context& ctx) { for (const auto& blob_id : blob_ids) { TraitInput input = {}; BucketID bucket_id = GetBucketIdFromBlobId(context, rpc, blob_id); - input.bucket_name = GetBucketNameById(context, rpc, bucket_id); - input.blob_name = GetBlobNameFromId(context, rpc, blob_id); - if (t->onUnlinkFn != nullptr) { - t->onUnlinkFn(hermes_, input, t); - // TODO(hari): @errorhandling Check if unlinking was successful + if (!IsNullBucketId(bucket_id)) { + input.bucket_name = GetBucketNameById(context, rpc, bucket_id); + input.blob_name = GetBlobNameFromId(context, rpc, blob_id); + if (t->onUnlinkFn != nullptr) { + t->onUnlinkFn(hermes_, input, t); + // TODO(hari): @errorhandling Check if unlinking was successful + } } } } diff --git a/src/api/vbucket.h b/src/api/vbucket.h index dddda13c2..fd963ab13 100644 --- a/src/api/vbucket.h +++ b/src/api/vbucket.h @@ -39,8 +39,6 @@ class VBucket { VBucketID id_; /** Traits attached to this vbucket */ std::list attached_traits_; - /** \todo What's that Bob? */ - Blob local_blob; /** internal Hermes object owned by vbucket */ std::shared_ptr hermes_; /** The Context for this VBucket. \todo Why do we need that? */ @@ -52,7 +50,6 @@ class VBucket { : name_(initial_name), id_({{0, 0}}), attached_traits_(), - local_blob(), hermes_(h), ctx_(ctx) { if (IsVBucketNameTooLong(name_)) { @@ -116,8 +113,16 @@ class VBucket { /** check if blob is in this vbucket */ bool ContainsBlob(std::string blob_name, std::string bucket_name); - /** get a blob linked to this vbucket */ - Blob &GetBlob(std::string blob_name, std::string bucket_name); + /** Get a Blob, calling any OnGet callbacks of attached Traits. + * + * Exactly like Bucket::Get, except this function invokes the OnGet callback + * of any attached Traits. + */ + size_t Get(const std::string &name, Bucket &bkt, Blob &user_blob, + const Context &ctx); + size_t Get(const std::string &name, Bucket &bkt, Blob &user_blob); + size_t Get(const std::string &name, Bucket &bkt, void *user_blob, + size_t blob_size, const Context &ctx); /** retrieves the subset of blob links satisfying pred */ /** could return iterator */ diff --git a/src/buffer_organizer.cc b/src/buffer_organizer.cc index ff8b9945d..798daa1b1 100644 --- a/src/buffer_organizer.cc +++ b/src/buffer_organizer.cc @@ -16,6 +16,7 @@ #include "hermes.h" #include "buffer_organizer.h" +#include "metadata_storage.h" #include "data_placement_engine.h" namespace hermes { @@ -174,6 +175,7 @@ void LocalEnqueueBoMove(SharedMemoryContext *context, RpcContext *rpc, BoPriority priority) { ThreadPool *pool = &context->bo->pool; bool is_high_priority = priority == BoPriority::kHigh; + VLOG(1) << "BufferOrganizer queuing Blob " << blob_id.as_int; pool->run(std::bind(BoMove, context, rpc, moves, blob_id, bucket_id, internal_blob_name), is_high_priority); @@ -185,9 +187,13 @@ void LocalEnqueueBoMove(SharedMemoryContext *context, RpcContext *rpc, void BoMove(SharedMemoryContext *context, RpcContext *rpc, const BoMoveList &moves, BlobID blob_id, BucketID bucket_id, const std::string &internal_blob_name) { + VLOG(1) << "Moving blob " + << internal_blob_name.substr(kBucketIdStringSize, std::string::npos) + << std::endl; MetadataManager *mdm = GetMetadataManagerFromContext(context); - if (LocalLockBlob(context, blob_id)) { + bool got_lock = BeginReaderLock(&mdm->bucket_delete_lock); + if (got_lock && LocalLockBlob(context, blob_id)) { auto warning_string = [](BufferID id) { std::ostringstream ss; ss << "BufferID" << id.as_int << " not found on this node\n"; @@ -236,6 +242,8 @@ void BoMove(SharedMemoryContext *context, RpcContext *rpc, } if (replacement_ids.size() > 0) { + // TODO(chogan): Only need to allocate a new BufferIdList if + // replacement.size > replaced.size std::vector buffer_ids = LocalGetBufferIdList(mdm, blob_id); using BufferIdSet = std::unordered_set; BufferIdSet new_buffer_ids(buffer_ids.begin(), buffer_ids.end()); @@ -260,6 +268,8 @@ void BoMove(SharedMemoryContext *context, RpcContext *rpc, BlobInfo new_info = {}; BlobInfo *old_info = GetBlobInfoPtr(mdm, blob_id); new_info.stats = old_info->stats; + // Invalidate the old Blob. It will get deleted when its TicketMutex + // reaches old_info->last old_info->stop = true; ReleaseBlobInfoPtr(mdm); LocalPut(mdm, new_blob_id, new_info); @@ -273,11 +283,25 @@ void BoMove(SharedMemoryContext *context, RpcContext *rpc, if (!BlobIsInSwap(blob_id)) { LocalReleaseBuffers(context, replaced_ids); } - LocalFreeBufferIdList(context, blob_id); + // NOTE(chogan): We don't free the Blob's BufferIdList here because that + // would make the buffer_id_list_offset available for new incoming Blobs, + // and we can't reuse the buffer_id_list_offset until the old BlobInfo is + // deleted. We take care of both in LocalLockBlob when the final + // outstanding operation on this BlobID is complete (which is tracked by + // BlobInfo::last). } LocalUnlockBlob(context, blob_id); + VLOG(1) << "Done moving blob " + << internal_blob_name.substr(kBucketIdStringSize, std::string::npos) + << std::endl; } else { - LOG(WARNING) << "Couldn't lock BlobID " << blob_id.as_int << "\n"; + if (got_lock) { + LOG(WARNING) << "Couldn't lock BlobID " << blob_id.as_int << "\n"; + } + } + + if (got_lock) { + EndReaderLock(&mdm->bucket_delete_lock); } } @@ -407,6 +431,225 @@ void OrganizeBlob(SharedMemoryContext *context, RpcContext *rpc, } } +void EnforceCapacityThresholds(SharedMemoryContext *context, RpcContext *rpc, + ViolationInfo info) { + u32 target_node = info.target_id.bits.node_id; + if (target_node == rpc->node_id) { + LocalEnforceCapacityThresholds(context, rpc, info); + } else { + RpcCall(rpc, target_node, "RemoteEnforceCapacityThresholds", info); + } +} + +void LocalEnforceCapacityThresholds(SharedMemoryContext *context, + RpcContext *rpc, ViolationInfo info) { + MetadataManager *mdm = GetMetadataManagerFromContext(context); + + // TODO(chogan): Factor out the common code in the kMin and kMax cases + switch (info.violation) { + case ThresholdViolation::kMin: { + // TODO(chogan): Allow sorting Targets by any metric. This + // implementation only works if the Targets are listed in the + // configuration in order of decreasing bandwidth. + for (u16 target_index = mdm->node_targets.length - 1; + target_index != info.target_id.bits.index; + --target_index) { + TargetID src_target_id = { + info.target_id.bits.node_id, target_index, target_index + }; + + Target *src_target = GetTargetFromId(context, src_target_id); + BeginTicketMutex(&src_target->effective_blobs_lock); + std::vector blob_ids = + GetChunkedIdList(mdm, src_target->effective_blobs); + EndTicketMutex(&src_target->effective_blobs_lock); + + auto compare_importance = [context](const u64 lhs, const u64 rhs) { + BlobID lhs_blob_id = {}; + lhs_blob_id.as_int = lhs; + f32 lhs_importance_score = LocalGetBlobImportanceScore(context, + lhs_blob_id); + + BlobID rhs_blob_id = {}; + rhs_blob_id.as_int = rhs; + f32 rhs_importance_score = LocalGetBlobImportanceScore(context, + rhs_blob_id); + + return lhs_importance_score < rhs_importance_score; + }; + + std::sort(blob_ids.begin(), blob_ids.end(), compare_importance); + + size_t bytes_moved = 0; + + for (size_t idx = 0; + idx < blob_ids.size() && bytes_moved < info.violation_size; + ++idx) { + BlobID most_important_blob {}; + std::vector buffers_to_move; + most_important_blob.as_int = blob_ids[idx]; + std::vector buffer_ids = + LocalGetBufferIdList(mdm, most_important_blob); + + // Filter out BufferIDs not in the Target + std::vector buffer_ids_in_target; + for (size_t i = 0; i < buffer_ids.size(); ++i) { + BufferHeader *header = GetHeaderByBufferId(context, buffer_ids[i]); + DeviceID device_id = header->device_id; + if (device_id == src_target_id.bits.device_id) { + // TODO(chogan): Needs to changes when we support num_devices != + // num_targets + buffer_ids_in_target.push_back(buffer_ids[i]); + } + } + std::vector buffer_info = + GetBufferInfo(context, rpc, buffer_ids_in_target); + auto buffer_info_comparator = [](const BufferInfo &lhs, + const BufferInfo &rhs) { + return lhs.size > rhs.size; + }; + // Sort in descending order + std::sort(buffer_info.begin(), buffer_info.end(), + buffer_info_comparator); + for (size_t j = 0; + j < buffer_info.size() && bytes_moved < info.violation_size; + ++j) { + buffers_to_move.push_back(buffer_info[j]); + bytes_moved += buffer_info[j].size; + } + + BoMoveList moves; + for (size_t i = 0; i < buffers_to_move.size(); ++i) { + PlacementSchema schema; + using SchemaPair = std::pair; + schema.push_back(SchemaPair(buffers_to_move[i].size, + info.target_id)); + std::vector dests = GetBuffers(context, schema); + if (dests.size() != 0) { + moves.push_back(std::pair(buffers_to_move[i].id, dests)); + } + } + + if (moves.size() > 0) { + // Queue BO task to move to lower tier + BucketID bucket_id = GetBucketIdFromBlobId(context, rpc, + most_important_blob); + std::string blob_name = + LocalGetBlobNameFromId(context, most_important_blob); + std::string internal_name = MakeInternalBlobName(blob_name, + bucket_id); + EnqueueBoMove(rpc, moves, most_important_blob, bucket_id, + internal_name, BoPriority::kLow); + } + } + } + break; + } + + case ThresholdViolation::kMax: { + Target *target = GetTargetFromId(context, info.target_id); + + f32 min_importance = FLT_MAX; + BlobID least_important_blob = {}; + + BeginTicketMutex(&target->effective_blobs_lock); + std::vector blob_ids = GetChunkedIdList(mdm, + target->effective_blobs); + EndTicketMutex(&target->effective_blobs_lock); + + // Find least important blob in violated Target + for (size_t i = 0; i < blob_ids.size(); ++i) { + BlobID blob_id = {}; + blob_id.as_int = blob_ids[i]; + f32 importance_score = LocalGetBlobImportanceScore(context, blob_id); + if (importance_score < min_importance) { + min_importance = importance_score; + least_important_blob = blob_id; + } + } + + assert(!IsNullBlobId(least_important_blob)); + + std::vector all_buffer_ids = + LocalGetBufferIdList(mdm, least_important_blob); + std::vector buffer_ids_in_target; + // Filter out BufferIDs not in this Target + for (size_t i = 0; i < all_buffer_ids.size(); ++i) { + BufferHeader *header = GetHeaderByBufferId(context, all_buffer_ids[i]); + DeviceID device_id = header->device_id; + if (device_id == info.target_id.bits.device_id) { + // TODO(chogan): Needs to changes when we support num_devices != + // num_targets + buffer_ids_in_target.push_back(all_buffer_ids[i]); + } + } + + std::vector buffer_info = + GetBufferInfo(context, rpc, buffer_ids_in_target); + auto buffer_info_comparator = [](const BufferInfo &lhs, + const BufferInfo &rhs) { + return lhs.size > rhs.size; + }; + // Sort in descending order + std::sort(buffer_info.begin(), buffer_info.end(), buffer_info_comparator); + + size_t bytes_moved = 0; + std::vector buffers_to_move; + size_t index = 0; + // Choose largest buffer until we've moved info.violation_size + while (bytes_moved < info.violation_size) { + buffers_to_move.push_back(buffer_info[index]); + bytes_moved += buffer_info[index].size; + index++; + } + + BoMoveList moves; + // TODO(chogan): Combine multiple smaller buffers into fewer larger + // buffers + for (size_t i = 0; i < buffers_to_move.size(); ++i) { + // TODO(chogan): Allow sorting Targets by any metric. This + // implementation only works if the Targets are listed in the + // configuration in order of decreasing bandwidth. + for (u16 target_index = info.target_id.bits.index + 1; + target_index < mdm->node_targets.length; + ++target_index) { + // Select Target 1 Tier lower than violated Target + TargetID target_dest = { + info.target_id.bits.node_id, target_index, target_index + }; + + PlacementSchema schema; + schema.push_back(std::pair(bytes_moved, + target_dest)); + std::vector dests = GetBuffers(context, schema); + if (dests.size() != 0) { + moves.push_back(std::pair(buffers_to_move[i].id, dests)); + break; + } + } + } + + if (moves.size() > 0) { + // Queue BO task to move to lower tier + BucketID bucket_id = GetBucketIdFromBlobId(context, rpc, + least_important_blob); + std::string blob_name = + LocalGetBlobNameFromId(context, least_important_blob); + std::string internal_name = MakeInternalBlobName(blob_name, bucket_id); + EnqueueBoMove(rpc, moves, least_important_blob, bucket_id, + internal_name, BoPriority::kLow); + } else { + LOG(WARNING) + << "BufferOrganizer: No capacity available in lower Targets.\n"; + } + break; + } + default: { + HERMES_INVALID_CODE_PATH; + } + } +} + void LocalShutdownBufferOrganizer(SharedMemoryContext *context) { // NOTE(chogan): ThreadPool destructor needs to be called manually since we // allocated the BO instance with placement new. diff --git a/src/buffer_organizer.h b/src/buffer_organizer.h index e958ba0a2..968adf19d 100644 --- a/src/buffer_organizer.h +++ b/src/buffer_organizer.h @@ -98,6 +98,8 @@ void LocalOrganizeBlob(SharedMemoryContext *context, RpcContext *rpc, void OrganizeBlob(SharedMemoryContext *context, RpcContext *rpc, BucketID bucket_id, const std::string &blob_name, f32 epsilon, f32 importance_score = -1); +void OrganizeDevice(SharedMemoryContext *context, RpcContext *rpc, + DeviceID devices_id); std::vector GetBufferInfo(SharedMemoryContext *context, RpcContext *rpc, const std::vector &buffer_ids); @@ -111,7 +113,10 @@ void LocalEnqueueBoMove(SharedMemoryContext *context, RpcContext *rpc, void EnqueueBoMove(RpcContext *rpc, const BoMoveList &moves, BlobID blob_id, BucketID bucket_id, const std::string &internal_name, BoPriority priority); - +void EnforceCapacityThresholds(SharedMemoryContext *context, RpcContext *rpc, + ViolationInfo info); +void LocalEnforceCapacityThresholds(SharedMemoryContext *context, + RpcContext *rpc, ViolationInfo info); } // namespace hermes #endif // HERMES_BUFFER_ORGANIZER_H_ diff --git a/src/buffer_pool.cc b/src/buffer_pool.cc index 59b68ccb4..1df4c85e1 100644 --- a/src/buffer_pool.cc +++ b/src/buffer_pool.cc @@ -204,7 +204,7 @@ inline BufferHeader *GetHeaderByIndex(SharedMemoryContext *context, u32 index) { } BufferHeader *GetHeaderByBufferId(SharedMemoryContext *context, - BufferID id) { + BufferID id) { BufferHeader *result = GetHeaderByIndex(context, id.bits.header_index); return result; @@ -781,7 +781,7 @@ Device *InitDevices(Arena *arena, Config *config, f32 &min_bw, f32 &max_bw) { Target *InitTargets(Arena *arena, Config *config, Device *devices, int node_id) { - Target *result = PushArray(arena, config->num_targets); + Target *result = PushClearedArray(arena, config->num_targets); if (config->num_targets != config->num_devices) { HERMES_NOT_IMPLEMENTED_YET; @@ -1723,7 +1723,8 @@ SwapBlob PutToSwap(SharedMemoryContext *context, RpcContext *rpc, u32 target_node = rpc->node_id; SwapBlob swap_blob = WriteToSwap(context, blob, target_node, bucket_id); std::vector buffer_ids = SwapBlobToVec(swap_blob); - AttachBlobToBucket(context, rpc, name.c_str(), bucket_id, buffer_ids, true); + AttachBlobToBucket(context, rpc, name.c_str(), bucket_id, buffer_ids, + kSwapTargetId, true); return swap_blob; } @@ -1768,9 +1769,16 @@ api::Status PlaceBlob(SharedMemoryContext *context, RpcContext *rpc, WriteBlobToBuffers(context, rpc, blob, buffer_ids); HERMES_END_TIMED_BLOCK(); + std::pair max_target = + *std::max_element(schema.begin(), schema.end(), + [](const auto& lhs, const auto& rhs) { + return lhs.first < rhs.first; + }); + TargetID effective_target = max_target.second; + // NOTE(chogan): Update all metadata associated with this Put AttachBlobToBucket(context, rpc, name.c_str(), bucket_id, buffer_ids, - false, called_from_buffer_organizer); + effective_target, false, called_from_buffer_organizer); } else { if (ctx.disable_swap) { result = PLACE_SWAP_BLOB_TO_BUF_FAILED; diff --git a/src/buffer_pool.h b/src/buffer_pool.h index 707303198..3002b69f9 100644 --- a/src/buffer_pool.h +++ b/src/buffer_pool.h @@ -75,6 +75,8 @@ struct Target { u64 capacity; std::atomic remaining_space; std::atomic speed; + ChunkedIdList effective_blobs; + TicketMutex effective_blobs_lock; }; /** diff --git a/src/config_parser.cc b/src/config_parser.cc index 7fbe6883b..3144dbc22 100644 --- a/src/config_parser.cc +++ b/src/config_parser.cc @@ -82,6 +82,7 @@ static const char *kConfigVariableStrings[ConfigVariable_Count] = { "is_shared_device", "buffer_organizer_num_threads", "default_rr_split", + "bo_capacity_thresholds", }; EntireFile ReadEntireFile(Arena *arena, const char *path) { @@ -505,8 +506,8 @@ Token *ParseIntList(Token *tok, int *out, int n) { return tok; } -Token *ParseIntListList(Token *tok, int out[][hermes::kMaxBufferPoolSlabs], - int n, int *m) { +template +Token *ParseIntListList(Token *tok, int out[][N], int n, int *m) { if (IsOpenCurlyBrace(tok)) { tok = tok->next; for (int i = 0; i < n; ++i) { @@ -570,8 +571,8 @@ Token *ParseFloatList(Token *tok, f32 *out, int n) { return tok; } -Token *ParseFloatListList(Token *tok, f32 out[][hermes::kMaxBufferPoolSlabs], - int n, int *m) { +template +Token *ParseFloatListList(Token *tok, f32 out[][N], int n, int *m) { if (IsOpenCurlyBrace(tok)) { tok = tok->next; for (int i = 0; i < n; ++i) { @@ -979,6 +980,21 @@ void ParseTokens(TokenList *tokens, Config *config) { config->default_rr_split = ParseInt(&tok); break; } + case ConfigVariable_BOCapacityThresholds: { + RequireNumDevices(config); + // Each entry has a min and max threshold + std::vector num_thresholds(config->num_devices, 2); + float thresholds[kMaxDevices][2] = {0}; + + tok = ParseFloatListList(tok, thresholds, config->num_devices, + num_thresholds.data()); + + for (int i = 0; i < config->num_devices; ++i) { + config->bo_capacity_thresholds[i].min = thresholds[i][0]; + config->bo_capacity_thresholds[i].max = thresholds[i][1]; + } + break; + } default: { HERMES_INVALID_CODE_PATH; break; diff --git a/src/config_parser.h b/src/config_parser.h index 480e9f2bd..68743e4b1 100644 --- a/src/config_parser.h +++ b/src/config_parser.h @@ -69,6 +69,7 @@ enum ConfigVariable { ConfigVariable_IsSharedDevice, ConfigVariable_BoNumThreads, ConfigVariable_RRSplit, + ConfigVariable_BOCapacityThresholds, ConfigVariable_Count }; diff --git a/src/data_placement_engine.cc b/src/data_placement_engine.cc index aea6d6504..70e965466 100644 --- a/src/data_placement_engine.cc +++ b/src/data_placement_engine.cc @@ -258,7 +258,7 @@ Status MinimizeIoTimePlacement(const std::vector &blob_sizes, ctx.minimize_io_time_options.capacity_change_threshold; size_t constraints_per_target = 1; - DLOG(INFO) << "MinimizeIoTimePlacement()::minimum_remaining_capacity=" << + VLOG(1) << "MinimizeIoTimePlacement()::minimum_remaining_capacity=" << minimum_remaining_capacity; if (minimum_remaining_capacity != 0) { constraints_per_target++; @@ -266,7 +266,7 @@ Status MinimizeIoTimePlacement(const std::vector &blob_sizes, if (capacity_change_threshold != 0) { constraints_per_target++; } - DLOG(INFO) << "MinimizeIoTimePlacement()::constraints_per_target=" << + VLOG(1) << "MinimizeIoTimePlacement()::constraints_per_target=" << constraints_per_target; const size_t total_constraints = num_blobs + (num_targets * constraints_per_target) - 1; @@ -351,23 +351,27 @@ Status MinimizeIoTimePlacement(const std::vector &blob_sizes, int last4 = 0; // Placement Ratio - for (size_t j {0}; j < num_targets-1; ++j) { - std::string row_name {"pr_row_" + std::to_string(j)}; - glp_set_row_name(lp, num_constrts+j+1, row_name.c_str()); - glp_set_row_bnds(lp, num_constrts+j+1, GLP_LO, 0.0, 0.0); - - for (size_t i {0}; i < num_blobs; ++i) { - int ij = j * num_blobs + i + 1 + last3 + j; - ia[ij] = num_constrts+j+1, ja[ij] = j+2, + if (ctx.minimize_io_time_options.use_placement_ratio) { + for (size_t j {0}; j < num_targets-1; ++j) { + std::string row_name {"pr_row_" + std::to_string(j)}; + glp_set_row_name(lp, num_constrts+j+1, row_name.c_str()); + glp_set_row_bnds(lp, num_constrts+j+1, GLP_LO, 0.0, 0.0); + + for (size_t i {0}; i < num_blobs; ++i) { + int ij = j * num_blobs + i + 1 + last3 + j; + ia[ij] = num_constrts+j+1, ja[ij] = j+2, ar[ij] = static_cast(blob_sizes[i]); - double placement_ratio = static_cast(node_state[j+1])/ - node_state[j]; - ij = ij + 1; - ia[ij] = num_constrts+j+1, ja[ij] = j+1, + double placement_ratio = static_cast(node_state[j+1])/ + node_state[j]; + ij = ij + 1; + ia[ij] = num_constrts+j+1, ja[ij] = j+1, ar[ij] = static_cast(blob_sizes[i])*(0-placement_ratio); - last4 = ij; + last4 = ij; + } } + } else { + last4 = last3; } // Objective to minimize IO time @@ -378,7 +382,7 @@ Status MinimizeIoTimePlacement(const std::vector &blob_sizes, static_cast(blob_sizes[i])/bandwidths[j]); } } - DLOG(INFO) << "MinimizeIoTimePlacement()::last4=" << last4; + VLOG(1) << "MinimizeIoTimePlacement()::last4=" << last4; glp_load_matrix(lp, last4, ia, ja, ar); glp_smcp parm; diff --git a/src/hermes_types.h b/src/hermes_types.h index bbe878f24..baa3dbbb3 100644 --- a/src/hermes_types.h +++ b/src/hermes_types.h @@ -43,6 +43,12 @@ typedef double f64; typedef u16 DeviceID; +struct ChunkedIdList { + u32 head_offset; + u32 length; + u32 capacity; +}; + namespace api { typedef std::vector Blob; @@ -56,11 +62,14 @@ enum class PlacementPolicy { struct MinimizeIoTimeOptions { double minimum_remaining_capacity; double capacity_change_threshold; - - MinimizeIoTimeOptions(double minimum_remaining_capacity = 0.1, - double capacity_change_threshold = 0.2) - : minimum_remaining_capacity(minimum_remaining_capacity), - capacity_change_threshold(capacity_change_threshold) { + bool use_placement_ratio; + + MinimizeIoTimeOptions(double minimum_remaining_capacity_ = 0.0, + double capacity_change_threshold_ = 0.0, + bool use_placement_ratio_ = false) + : minimum_remaining_capacity(minimum_remaining_capacity_), + capacity_change_threshold(capacity_change_threshold_), + use_placement_ratio(use_placement_ratio_) { } }; @@ -141,6 +150,8 @@ union TargetID { u64 as_int; }; +const TargetID kSwapTargetId = {{0, 0, 0}}; + /** * A PlacementSchema is a vector of (size, target) pairs where size is the * number of bytes to buffer and target is the TargetID where to buffer those @@ -168,6 +179,11 @@ enum ArenaType { kArenaType_Count /**< Sentinel value */ }; +struct Thresholds { + float min; + float max; +}; + /** * System and user configuration that is used to initialize Hermes. */ @@ -245,7 +261,9 @@ struct Config { api::PlacementPolicy default_placement_policy; /** Whether blob splitting is enabled for Round-Robin blob placement. */ bool default_rr_split; - + /** The min and max capacity threshold in MiB for each device at which the + * BufferOrganizer will trigger. */ + Thresholds bo_capacity_thresholds[kMaxDevices]; /** A base name for the BufferPool shared memory segement. Hermes appends the * value of the USER environment variable to this string. */ @@ -297,7 +315,7 @@ union BlobID { i32 node_id; } bits; - /** The BlobID as a unsigned 64-bit integer */ + /** The BlobID as an unsigned 64-bit integer */ u64 as_int; }; @@ -310,15 +328,9 @@ namespace api { enum class TraitType : u8 { META = 0, DATA = 1, - FILE_MAPPING = 2, - PERSIST = 3, -}; -} // namespace api - -struct TraitIdArray { - TraitID *ids; - u32 length; + PERSIST = 2, }; +} // namespace api } // namespace hermes #endif // HERMES_TYPES_H_ diff --git a/src/memory_management.cc b/src/memory_management.cc index f4246c363..3d367d344 100644 --- a/src/memory_management.cc +++ b/src/memory_management.cc @@ -415,6 +415,8 @@ void HeapFree(Heap *heap, void *ptr) { new_block = (FreeBlock *)((u8 *)(header + 1) + header->size - sizeof(FreeBlock)); } + + memset(ptr, 0, size); new_block->size = size + sizeof(FreeBlockHeader); HERMES_DEBUG_TRACK_FREE(header, new_block->size, heap->grows_up); @@ -469,6 +471,22 @@ Ticket TryBeginTicketMutex(TicketMutex *mutex, Ticket *existing_ticket) { return result; } +/** + * + */ +bool BeginTicketMutexIfNoWait(TicketMutex *mutex) { + u32 serving = mutex->serving.load(); + u32 ticket = mutex->ticket.load(); + u32 next = ticket + 1; + + bool result = false; + if (serving == ticket) { + result = mutex->ticket.compare_exchange_strong(ticket, next); + } + + return result; +} + void BeginTicketMutex(TicketMutex *mutex) { u32 ticket = mutex->ticket.fetch_add(1); while (ticket != mutex->serving.load()) { @@ -488,4 +506,53 @@ void EndTicketMutex(TicketMutex *mutex) { mutex->serving.fetch_add(1); } +const int kAttemptsBeforeYield = 100; + +bool BeginReaderLock(RwLock *lock) { + bool result = false; + if (!lock->writer_waiting.load()) { + lock->readers++; + result = true; + } + + return result; +} + +void EndReaderLock(RwLock *lock) { + u32 readers = lock->readers.load(); + + int retry = 0; + while (true) { + if (readers > 0) { + if (lock->readers.compare_exchange_weak(readers, readers - 1)) { + break; + } + } + retry++; + if (retry > kAttemptsBeforeYield) { + retry = 0; + sched_yield(); + } + } +} + +void BeginWriterLock(RwLock *lock) { + lock->writer_waiting.store(true); + + int retry = 0; + while (lock->readers.load() > 0) { + retry++; + if (retry > kAttemptsBeforeYield) { + retry = 0; + sched_yield(); + } + } + BeginTicketMutex(&lock->mutex); +} + +void EndWriterLock(RwLock *lock) { + EndTicketMutex(&lock->mutex); + lock->writer_waiting.store(false); +} + } // namespace hermes diff --git a/src/memory_management.h b/src/memory_management.h index cbe4dc26e..a629dd179 100644 --- a/src/memory_management.h +++ b/src/memory_management.h @@ -43,6 +43,12 @@ struct Ticket { bool acquired; }; +struct RwLock { + TicketMutex mutex; + std::atomic readers; + std::atomic writer_waiting; +}; + struct ArenaInfo { size_t sizes[kArenaType_Count]; size_t total; @@ -374,6 +380,11 @@ u8 *HeapExtentToPtr(Heap *heap); void BeginTicketMutex(TicketMutex *mutex); void EndTicketMutex(TicketMutex *mutex); +bool BeginReaderLock(RwLock *lock); +void EndReaderLock(RwLock *lock); +void BeginWriterLock(RwLock *lock); +void EndWriterLock(RwLock *lock); + } // namespace hermes #endif // HERMES_MEMORY_MANAGEMENT_H_ diff --git a/src/metadata_management.cc b/src/metadata_management.cc index e366f5026..ff3283f4c 100644 --- a/src/metadata_management.cc +++ b/src/metadata_management.cc @@ -18,13 +18,19 @@ #include #include "memory_management.h" +#include "metadata_management_internal.h" #include "buffer_pool.h" #include "buffer_pool_internal.h" +#include "buffer_organizer.h" #include "rpc.h" #include "metadata_storage.h" namespace hermes { +bool operator!=(const TargetID &lhs, const TargetID &rhs) { + return lhs.as_int != rhs.as_int; +} + static bool IsNameTooLong(const std::string &name, size_t max) { bool result = false; if (name.size() + 1 >= max) { @@ -649,30 +655,44 @@ BufferIdArray GetBufferIdsFromBlobId(Arena *arena, return result; } -void LocalCreateBlobMetadata(MetadataManager *mdm, const std::string &blob_name, - BlobID blob_id) { +void LocalCreateBlobMetadata(SharedMemoryContext *context, MetadataManager *mdm, + const std::string &blob_name, BlobID blob_id, + TargetID effective_target) { LocalPut(mdm, blob_name.c_str(), blob_id.as_int, kMapType_BlobId); BlobInfo blob_info = {}; blob_info.stats.frequency = 1; blob_info.stats.recency = mdm->clock++; + blob_info.effective_target = effective_target; + + if (effective_target != kSwapTargetId) { + assert(blob_id.bits.node_id == (int)effective_target.bits.node_id); + Target *target = GetTargetFromId(context, effective_target); + BeginTicketMutex(&target->effective_blobs_lock); + AppendToChunkedIdList(mdm, &target->effective_blobs, blob_id.as_int); + EndTicketMutex(&target->effective_blobs_lock); + } + LocalPut(mdm, blob_id, blob_info); } -void CreateBlobMetadata(MetadataManager *mdm, RpcContext *rpc, - const std::string &blob_name, BlobID blob_id) { +void CreateBlobMetadata(SharedMemoryContext *context, RpcContext *rpc, + const std::string &blob_name, BlobID blob_id, + TargetID effective_target) { + MetadataManager *mdm = GetMetadataManagerFromContext(context); u32 target_node = GetBlobNodeId(blob_id); if (target_node == rpc->node_id) { - LocalCreateBlobMetadata(mdm, blob_name, blob_id); + LocalCreateBlobMetadata(context, mdm, blob_name, blob_id, effective_target); } else { RpcCall(rpc, target_node, "RemoteCreateBlobMetadata", blob_name, - blob_id); + blob_id, effective_target); } } void AttachBlobToBucket(SharedMemoryContext *context, RpcContext *rpc, const char *blob_name, BucketID bucket_id, const std::vector &buffer_ids, - bool is_swap_blob, bool called_from_buffer_organizer) { + TargetID effective_target, bool is_swap_blob, + bool called_from_buffer_organizer) { MetadataManager *mdm = GetMetadataManagerFromContext(context); std::string internal_name = MakeInternalBlobName(blob_name, bucket_id); @@ -699,7 +719,7 @@ void AttachBlobToBucket(SharedMemoryContext *context, RpcContext *rpc, blob_id.bits.buffer_ids_offset = AllocateBufferIdList(context, rpc, target_node, buffer_ids); - CreateBlobMetadata(mdm, rpc, internal_name, blob_id); + CreateBlobMetadata(context, rpc, internal_name, blob_id, effective_target); AddBlobIdToBucket(mdm, rpc, blob_id, bucket_id); } @@ -727,6 +747,10 @@ void WaitForOutstandingBlobOps(MetadataManager *mdm, BlobID blob_id) { BlobInfo *blob_info = GetBlobInfoPtr(mdm, blob_id); if (blob_info) { t = TryBeginTicketMutex(&blob_info->lock, ticket); + } else { + // Blob was deleted + ReleaseBlobInfoPtr(mdm); + break; } if (!t.acquired) { ReleaseBlobInfoPtr(mdm); @@ -929,7 +953,7 @@ SystemViewState *GetLocalSystemViewState(SharedMemoryContext *context) { } std::vector LocalGetGlobalDeviceCapacities(SharedMemoryContext *context) { - SystemViewState *global_svs = GetGlobalSystemViewState(context); + GlobalSystemViewState *global_svs = GetGlobalSystemViewState(context); std::vector result(global_svs->num_devices); for (size_t i = 0; i < result.size(); ++i) { @@ -940,7 +964,7 @@ std::vector LocalGetGlobalDeviceCapacities(SharedMemoryContext *context) { } std::vector GetGlobalDeviceCapacities(SharedMemoryContext *context, - RpcContext *rpc) { + RpcContext *rpc) { MetadataManager *mdm = GetMetadataManagerFromContext(context); u32 target_node = mdm->global_system_view_state_node_id; @@ -956,25 +980,67 @@ std::vector GetGlobalDeviceCapacities(SharedMemoryContext *context, return result; } -SystemViewState *GetGlobalSystemViewState(SharedMemoryContext *context) { +GlobalSystemViewState *GetGlobalSystemViewState(SharedMemoryContext *context) { MetadataManager *mdm = GetMetadataManagerFromContext(context); - SystemViewState *result = - (SystemViewState *)((u8 *)mdm + mdm->global_system_view_state_offset); + GlobalSystemViewState *result = + (GlobalSystemViewState *)((u8 *)mdm + mdm->global_system_view_state_offset); assert((u8 *)result != (u8 *)mdm); return result; } -void LocalUpdateGlobalSystemViewState(SharedMemoryContext *context, - std::vector adjustments) { - for (size_t i = 0; i < adjustments.size(); ++i) { - SystemViewState *state = GetGlobalSystemViewState(context); - if (adjustments[i]) { - state->bytes_available[i].fetch_add(adjustments[i]); - DLOG(INFO) << "DeviceID " << i << " adjusted by " << adjustments[i] - << " bytes\n"; +std::vector +LocalUpdateGlobalSystemViewState(SharedMemoryContext *context, u32 node_id, + std::vector adjustments) { + std::vector result; + for (size_t device_idx = 0; device_idx < adjustments.size(); ++device_idx) { + GlobalSystemViewState *state = GetGlobalSystemViewState(context); + u32 target_idx = ((node_id - 1) * adjustments.size()) + device_idx; + if (adjustments[device_idx]) { + state->bytes_available[target_idx].fetch_add(adjustments[device_idx]); + DLOG(INFO) << "DeviceID " << device_idx << " on node " << node_id + << " adjusted by " << adjustments[device_idx] << " bytes\n"; + } + + // Collect devices for which to trigger the BufferOrganizer if the + // capacities are beyond the min/max thresholds + float percentage_available = 0.0f; + if (state->bytes_available[target_idx] > 0) { + percentage_available = ((f32)state->bytes_available[target_idx].load() / + (f32)state->capacities[device_idx]); + } + + ViolationInfo info = {}; + float percentage_violation = 0.0f; + f32 percentage_used = 1.0f - percentage_available; + float min_threshold = state->bo_capacity_thresholds[device_idx].min; + float max_threshold = state->bo_capacity_thresholds[device_idx].max; + + if (percentage_used > max_threshold) { + percentage_violation = percentage_used - max_threshold; + info.violation = ThresholdViolation::kMax; + } + if (percentage_used < min_threshold) { + percentage_violation = min_threshold - percentage_used; + info.violation = ThresholdViolation::kMin; + } + + if (percentage_violation > 0.0f) { + TargetID target_id = {}; + target_id.bits.node_id = node_id; + target_id.bits.device_id = (DeviceID)device_idx; + // TODO(chogan): This needs to change when we support num_devices != + // num_targets + target_id.bits.index = device_idx; + + info.target_id = target_id; + info.violation_size = + (size_t)(percentage_violation * state->capacities[device_idx]); + result.push_back(info); } } + + return result; } void UpdateGlobalSystemViewState(SharedMemoryContext *context, @@ -991,15 +1057,23 @@ void UpdateGlobalSystemViewState(SharedMemoryContext *context, } } + std::vector devices_to_organize; if (update_needed) { u32 target_node = mdm->global_system_view_state_node_id; if (target_node == rpc->node_id) { - LocalUpdateGlobalSystemViewState(context, adjustments); + devices_to_organize = + LocalUpdateGlobalSystemViewState(context, rpc->node_id, adjustments); } else { - RpcCall(rpc, target_node, "RemoteUpdateGlobalSystemViewState", - adjustments); + devices_to_organize = + RpcCall>(rpc, target_node, + "RemoteUpdateGlobalSystemViewState", + adjustments); } } + + for (size_t i = 0; i < devices_to_organize.size(); ++i) { + EnforceCapacityThresholds(context, rpc, devices_to_organize[i]); + } } TargetID FindTargetIdFromDeviceId(const std::vector &targets, @@ -1029,7 +1103,38 @@ SystemViewState *CreateSystemViewState(Arena *arena, Config *config) { SystemViewState *result = PushClearedStruct(arena); result->num_devices = config->num_devices; for (int i = 0; i < result->num_devices; ++i) { + result->capacities[i] = config->capacities[i]; result->bytes_available[i] = config->capacities[i]; + + // Min and max thresholds + result->bo_capacity_thresholds[i] = config->bo_capacity_thresholds[i]; + } + + return result; +} + +GlobalSystemViewState *CreateGlobalSystemViewState(RpcContext *rpc, + Arena *arena, + Config *config) { + GlobalSystemViewState *result = + PushClearedStruct(arena); + result->num_devices = config->num_devices; + + for (int i = 0; i < result->num_devices; ++i) { + result->capacities[i] = config->capacities[i]; + // Min and max thresholds + result->bo_capacity_thresholds[i] = config->bo_capacity_thresholds[i]; + } + size_t num_targets = config->num_devices * rpc->num_nodes; + result->num_targets = num_targets; + result->bytes_available = + PushClearedArray>(arena, num_targets); + + for (u32 node_idx = 0; node_idx < rpc->num_nodes; ++node_idx) { + for (int device_idx = 0; device_idx < result->num_devices; ++device_idx) { + u64 index = (node_idx * result->num_devices) + device_idx; + result->bytes_available[index].store(result->capacities[device_idx]); + } } return result; @@ -1089,11 +1194,11 @@ SwapBlob IdArrayToSwapBlob(BufferIdArray ids) { return result; } -void InitMetadataManager(MetadataManager *mdm, Arena *arena, Config *config, - int node_id) { +void InitMetadataManager(MetadataManager *mdm, RpcContext *rpc, Arena *arena, + Config *config) { // NOTE(chogan): All MetadataManager offsets are relative to the address of // the MDM itself. - + u32 node_id = rpc->node_id; arena->error_handler = MetadataArenaErrorHandler; mdm->map_seed = 0x4E58E5DF; @@ -1111,7 +1216,8 @@ void InitMetadataManager(MetadataManager *mdm, Arena *arena, Config *config, if (node_id == 1) { // NOTE(chogan): Only Node 1 has the Global SystemViewState - SystemViewState *global_state = CreateSystemViewState(arena, config); + GlobalSystemViewState *global_state = + CreateGlobalSystemViewState(rpc, arena, config); mdm->global_system_view_state_offset = GetOffsetFromMdm(mdm, global_state); } mdm->global_system_view_state_node_id = 1; @@ -1430,6 +1536,7 @@ bool LocalLockBlob(SharedMemoryContext *context, BlobID blob_id) { result = false; if (t.ticket == blob_info->last) { LocalDelete(mdm, blob_id); + LocalFreeBufferIdList(context, blob_id); } } ReleaseBlobInfoPtr(mdm); diff --git a/src/metadata_management.h b/src/metadata_management.h index acb53aebd..102853001 100644 --- a/src/metadata_management.h +++ b/src/metadata_management.h @@ -63,6 +63,17 @@ enum MapType { kMapType_Count }; +enum class ThresholdViolation { + kMin, + kMax +}; + +struct ViolationInfo { + TargetID target_id; + ThresholdViolation violation; + size_t violation_size; +}; + struct Stats { u32 recency; u32 frequency; @@ -70,12 +81,6 @@ struct Stats { const int kIdListChunkSize = 10; -struct ChunkedIdList { - u32 head_offset; - u32 length; - u32 capacity; -}; - struct IdList { u32 head_offset; u32 length; @@ -89,6 +94,7 @@ struct BufferIdArray { struct BlobInfo { Stats stats; TicketMutex lock; + TargetID effective_target; u32 last; bool stop; @@ -97,12 +103,16 @@ struct BlobInfo { stats.frequency = 0; lock.ticket.store(0); lock.serving.store(0); + effective_target.as_int = 0; } BlobInfo& operator=(const BlobInfo &other) { stats = other.stats; lock.ticket.store(other.lock.ticket.load()); lock.serving.store(other.lock.serving.load()); + effective_target = other.effective_target; + last = other.last; + stop = other.stop; return *this; } @@ -122,13 +132,49 @@ struct VBucketInfo { ChunkedIdList blobs; std::atomic ref_count; std::atomic async_flush_count; + /** Not currently used since Traits are process local. */ TraitID traits[kMaxTraitsPerVBucket]; bool active; }; struct SystemViewState { + /** Total capacities of each device. */ + u64 capacities[kMaxDevices]; + /** The remaining bytes available for buffering in each device. */ std::atomic bytes_available[kMaxDevices]; + /** The min and max threshold (percentage) for each device at which the + * BufferOrganizer will trigger. */ + Thresholds bo_capacity_thresholds[kMaxDevices]; + /** The total number of buffering devices. */ + int num_devices; +}; + +// TODO(chogan): +/** + * A snapshot view of the entire system's Targets' available capacities. + * + * This information is only stored on 1 node, designated by + * MetadataManager::global_system_view_state_node_id, and is only updated by 1 + * rank (the Hermes process on that node). Hence, it does not need to be stored + * in shared memory and we are able to use normal std containers. However, since + * multiple RPC handler threads can potentially update the `bytes_available` + * field concurrently, we must not do any operations on the vector itself. We + * can only do operations on the atomics within. The vector is created in + * StartGlobalSystemViewStateUpdateThread, and thereafter we can only call + * functions on the individual atomics (e.g., bytes_available[i].fetch_add), + * which is thread safe. + */ +struct GlobalSystemViewState { + /** The total number of buffering Targets in the system */ + u64 num_targets; + /** The number of devices per node */ int num_devices; + u64 capacities[kMaxDevices]; + /** The remaining capacity of each Target in the system */ + std::atomic *bytes_available; + /** The min and max capacity thresholds (percentage) for each Target in the + * system */ + Thresholds bo_capacity_thresholds[kMaxDevices]; }; struct MetadataManager { @@ -162,6 +208,8 @@ struct MetadataManager { /** Lock for accessing `BucketInfo` structures located at * `bucket_info_offset` */ TicketMutex bucket_mutex; + RwLock bucket_delete_lock; + /** Lock for accessing `VBucketInfo` structures located at * `vbucket_info_offset` */ TicketMutex vbucket_mutex; @@ -196,8 +244,8 @@ struct RpcContext; /** * */ -void InitMetadataManager(MetadataManager *mdm, Arena *arena, Config *config, - int node_id); +void InitMetadataManager(MetadataManager *mdm, RpcContext *rpc, Arena *arena, + Config *config); /** * @@ -287,7 +335,7 @@ VBucketID GetOrCreateVBucketId(SharedMemoryContext *context, RpcContext *rpc, void AttachBlobToBucket(SharedMemoryContext *context, RpcContext *rpc, const char *blob_name, BucketID bucket_id, const std::vector &buffer_ids, - bool is_swap_blob = false, + TargetID effective_target, bool is_swap_blob = false, bool called_from_buffer_organizer = false); /** diff --git a/src/metadata_management_internal.h b/src/metadata_management_internal.h index 7789978c1..18add4dc4 100644 --- a/src/metadata_management_internal.h +++ b/src/metadata_management_internal.h @@ -75,9 +75,10 @@ void LocalPut(MetadataManager *mdm, const char *key, u64 val, MapType map_type); void LocalDelete(MetadataManager *mdm, const char *key, MapType map_type); u64 LocalGetRemainingTargetCapacity(SharedMemoryContext *context, TargetID id); -void LocalUpdateGlobalSystemViewState(SharedMemoryContext *context, - std::vector adjustments); -SystemViewState *GetGlobalSystemViewState(SharedMemoryContext *context); +std::vector +LocalUpdateGlobalSystemViewState(SharedMemoryContext *context, u32 node_id, + std::vector adjustments); +GlobalSystemViewState *GetGlobalSystemViewState(SharedMemoryContext *context); std::vector LocalGetGlobalDeviceCapacities(SharedMemoryContext *context); std::vector GetGlobalDeviceCapacities(SharedMemoryContext *context, RpcContext *rpc); @@ -86,7 +87,7 @@ void UpdateGlobalSystemViewState(SharedMemoryContext *context, void StartGlobalSystemViewStateUpdateThread(SharedMemoryContext *context, RpcContext *rpc, Arena *arena, - double slepp_ms); + double sleep_ms); void InitMetadataStorage(SharedMemoryContext *context, MetadataManager *mdm, Arena *arena, Config *config); @@ -135,15 +136,18 @@ std::string LocalGetBucketNameById(SharedMemoryContext *context, BucketID blob_id); +void WaitForOutstandingBlobOps(MetadataManager *mdm, BlobID blob_id); int LocalGetNumOutstandingFlushingTasks(SharedMemoryContext *context, VBucketID id); int GetNumOutstandingFlushingTasks(SharedMemoryContext *context, RpcContext *rpc, VBucketID id); -void LocalCreateBlobMetadata(MetadataManager *mdm, const std::string &blob_name, - BlobID blob_id); +void LocalCreateBlobMetadata(SharedMemoryContext *context, MetadataManager *mdm, + const std::string &blob_name, BlobID blob_id, + TargetID effective_target); Heap *GetIdHeap(MetadataManager *mdm); Heap *GetMapHeap(MetadataManager *mdm); IdList AllocateIdList(MetadataManager *mdm, u32 length); void FreeIdList(MetadataManager *mdm, IdList id_list); +u32 AppendToChunkedIdList(MetadataManager *mdm, ChunkedIdList *id_list, u64 id); } // namespace hermes #endif // HERMES_METADATA_MANAGEMENT_INTERNAL_H_ diff --git a/src/metadata_storage.h b/src/metadata_storage.h index 9503c4eab..1d219a60c 100644 --- a/src/metadata_storage.h +++ b/src/metadata_storage.h @@ -88,6 +88,15 @@ BlobInfo *GetBlobInfoPtr(MetadataManager *mdm, BlobID blob_id); */ void ReleaseBlobInfoPtr(MetadataManager *mdm); +u64 *GetIdsPtr(MetadataManager *mdm, IdList id_list); +u64 *GetIdsPtr(MetadataManager *mdm, ChunkedIdList id_list); +void ReleaseIdsPtr(MetadataManager *mdm); + +/** + * + */ +std::vector GetChunkedIdList(MetadataManager *mdm, ChunkedIdList id_list); + } // namespace hermes #endif // HERMES_METADATA_STORAGE_H_ diff --git a/src/metadata_storage_stb_ds.cc b/src/metadata_storage_stb_ds.cc index 21e79f884..f83ebc374 100644 --- a/src/metadata_storage_stb_ds.cc +++ b/src/metadata_storage_stb_ds.cc @@ -223,12 +223,18 @@ Stats LocalGetBlobStats(SharedMemoryContext *context, BlobID blob_id) { * Return a pointer to the the internal array of IDs that the `id_list` * represents. * - * T must be an `IdList` or a `ChunkedIdList`. This call acquires a lock, and - * must be paired with a corresponding call to `ReleaseIdsPtr` to release the - * lock. + * This call acquires a lock, and must be paired with a corresponding call to + * `ReleaseIdsPtr` to release the lock. */ -template -u64 *GetIdsPtr(MetadataManager *mdm, T id_list) { +u64 *GetIdsPtr(MetadataManager *mdm, IdList id_list) { + Heap *id_heap = GetIdHeap(mdm); + BeginTicketMutex(&mdm->id_mutex); + u64 *result = (u64 *)HeapOffsetToPtr(id_heap, id_list.head_offset); + + return result; +} + +u64 *GetIdsPtr(MetadataManager *mdm, ChunkedIdList id_list) { Heap *id_heap = GetIdHeap(mdm); BeginTicketMutex(&mdm->id_mutex); u64 *result = (u64 *)HeapOffsetToPtr(id_heap, id_list.head_offset); @@ -359,6 +365,24 @@ u32 AppendToChunkedIdList(MetadataManager *mdm, ChunkedIdList *id_list, return result; } +/** + * Assumes the caller has protected @p id_list with a lock. + * + * @return A vector of the IDs. + */ +std::vector GetChunkedIdList(MetadataManager *mdm, ChunkedIdList id_list) { + std::vector result(id_list.length); + if (id_list.length > 0) { + u64 *head = GetIdsPtr(mdm, id_list); + for (u32 i = 0; i < id_list.length; ++i) { + result[i] = head[i]; + } + ReleaseIdsPtr(mdm); + } + + return result; +} + u64 GetChunkedIdListElement(MetadataManager *mdm, ChunkedIdList *id_list, u32 index) { u64 result = 0; @@ -425,16 +449,19 @@ void LocalReplaceBlobIdInBucket(SharedMemoryContext *context, MetadataManager *mdm = GetMetadataManagerFromContext(context); BeginTicketMutex(&mdm->bucket_mutex); BucketInfo *info = LocalGetBucketInfoById(mdm, bucket_id); - ChunkedIdList *blobs = &info->blobs; - BlobID *blobs_arr = (BlobID *)GetIdsPtr(mdm, *blobs); - for (u32 i = 0; i < blobs->length; ++i) { - if (blobs_arr[i].as_int == old_blob_id.as_int) { - blobs_arr[i] = new_blob_id; - break; + if (info && info->active) { + ChunkedIdList *blobs = &info->blobs; + + BlobID *blobs_arr = (BlobID *)GetIdsPtr(mdm, *blobs); + for (u32 i = 0; i < blobs->length; ++i) { + if (blobs_arr[i].as_int == old_blob_id.as_int) { + blobs_arr[i] = new_blob_id; + break; + } } + ReleaseIdsPtr(mdm); } - ReleaseIdsPtr(mdm); EndTicketMutex(&mdm->bucket_mutex); } @@ -615,11 +642,10 @@ bool LocalDestroyBucket(SharedMemoryContext *context, RpcContext *rpc, const char *bucket_name, BucketID bucket_id) { bool destroyed = false; MetadataManager *mdm = GetMetadataManagerFromContext(context); + BeginWriterLock(&mdm->bucket_delete_lock); BeginTicketMutex(&mdm->bucket_mutex); BucketInfo *info = LocalGetBucketInfoById(mdm, bucket_id); - // TODO(chogan): @optimization Lock granularity can probably be relaxed if - // this is slow int ref_count = info->ref_count.load(); if (ref_count == 1) { if (HasAllocatedBlobs(info)) { @@ -637,6 +663,7 @@ bool LocalDestroyBucket(SharedMemoryContext *context, RpcContext *rpc, for (auto blob_id : blobs_to_destroy) { DestroyBlobById(context, rpc, blob_id, bucket_id); } + // Delete BlobId list FreeIdList(mdm, info->blobs); } @@ -661,6 +688,7 @@ bool LocalDestroyBucket(SharedMemoryContext *context, RpcContext *rpc, << ". It's refcount is " << ref_count << std::endl; } EndTicketMutex(&mdm->bucket_mutex); + EndWriterLock(&mdm->bucket_delete_lock); return destroyed; } diff --git a/src/rpc.h b/src/rpc.h index 1ba429c6c..55d49315e 100644 --- a/src/rpc.h +++ b/src/rpc.h @@ -42,6 +42,9 @@ struct RpcContext { /** Array of host numbers in shared memory. This size is * RpcContext::num_nodes */ int *host_numbers; + /** The number of host numbers that were present in the rpc_host_number_range + * entry in the config file*/ + size_t num_host_numbers; /** Array of host names stored in shared memory. This array size is * RpcContext::num_nodes. */ ShmemString *host_names; diff --git a/src/rpc_thallium.cc b/src/rpc_thallium.cc index 1bcf2c479..0edc3af61 100644 --- a/src/rpc_thallium.cc +++ b/src/rpc_thallium.cc @@ -22,7 +22,8 @@ namespace hermes { std::string GetHostNumberAsString(RpcContext *rpc, u32 node_id) { std::string result = ""; - if (rpc->num_nodes > 1) { + + if (rpc->num_host_numbers > 0) { // Subtract 1 because the node_id index starts at 1 instead of 0. We reserve // 0 so that BufferIDs (which are made from the node_id) can be NULL. int index = (node_id - 1); @@ -308,12 +309,12 @@ void ThalliumStartRpcServer(SharedMemoryContext *context, RpcContext *rpc, req.respond(result); }; - // TODO(chogan): Only need this on mdm->global_system_view_state_node_id. - // Probably should move it to a completely separate tl::engine. auto rpc_update_global_system_view_state = - [context](const request &req, std::vector adjustments) { - LocalUpdateGlobalSystemViewState(context, adjustments); - req.respond(true); + [context, rpc](const request &req, std::vector adjustments) { + std::vector result = + LocalUpdateGlobalSystemViewState(context, rpc->node_id, adjustments); + + req.respond(result); }; auto rpc_get_blob_ids = [context](const request &req, BucketID bucket_id) { @@ -418,9 +419,10 @@ void ThalliumStartRpcServer(SharedMemoryContext *context, RpcContext *rpc, auto rpc_create_blob_metadata = [context](const request &req, const std::string &blob_name, - BlobID blob_id) { + BlobID blob_id, TargetID effective_target) { MetadataManager *mdm = GetMetadataManagerFromContext(context); - LocalCreateBlobMetadata(mdm, blob_name, blob_id); + LocalCreateBlobMetadata(context, mdm, blob_name, blob_id, + effective_target); req.respond(true); }; @@ -433,6 +435,13 @@ void ThalliumStartRpcServer(SharedMemoryContext *context, RpcContext *rpc, req.respond(true); }; + auto rpc_enforce_capacity_thresholds = [context, rpc](const request &req, + ViolationInfo info) { + LocalEnforceCapacityThresholds(context, rpc, info); + // TODO(chogan): Can this be async? + req.respond(true); + }; + // TODO(chogan): Currently these three are only used for testing. rpc_server->define("GetBuffers", rpc_get_buffers); rpc_server->define("SplitBuffers", rpc_split_buffers).disable_response(); @@ -498,6 +507,8 @@ void ThalliumStartRpcServer(SharedMemoryContext *context, RpcContext *rpc, rpc_server->define("RemoteCreateBlobMetadata", rpc_create_blob_metadata); rpc_server->define("RemoteReplaceBlobIdInBucket", rpc_replace_blob_id_in_bucket); + rpc_server->define("RemoteEnforceCapacityThresholds", + rpc_enforce_capacity_thresholds); } void StartBufferOrganizer(SharedMemoryContext *context, RpcContext *rpc, @@ -639,6 +650,10 @@ void StopGlobalSystemViewStateUpdateThread(RpcContext *rpc) { void InitRpcContext(RpcContext *rpc, u32 num_nodes, u32 node_id, Config *config) { rpc->num_nodes = num_nodes; + // The number of host numbers in the rpc_host_number_range entry of the + // configuration file. Not necessarily the number of nodes because when there + // is only 1 node, the entry can be left blank, or contain 1 host number. + rpc->num_host_numbers = config->host_numbers.size(); rpc->node_id = node_id; rpc->start_server = ThalliumStartRpcServer; rpc->state_size = sizeof(ThalliumState); diff --git a/src/rpc_thallium.h b/src/rpc_thallium.h index 451539130..6a2aeb700 100644 --- a/src/rpc_thallium.h +++ b/src/rpc_thallium.h @@ -129,6 +129,11 @@ void serialize(A &ar, BufferInfo &info) { } #ifndef THALLIUM_USE_CEREAL + +// NOTE(chogan): Thallium's default serialization doesn't handle enums by +// default so we must write serialization code for all enums when we're not +// using cereal. + /** * Lets Thallium know how to serialize a MapType. * @@ -170,6 +175,19 @@ void load(A &ar, BoPriority &priority) { ar.read(&val, 1); priority = (BoPriority)val; } + +template +void save(A &ar, ThresholdViolation &violation) { + int val = (int)violation; + ar.write(&val, 1); +} + +template +void load(A &ar, ThresholdViolation &violation) { + int val = 0; + ar.read(&val, 1); + violation = (ThresholdViolation)val; +} #endif // #ifndef THALLIUM_USE_CEREAL @@ -198,6 +216,13 @@ void serialize(A &ar, BoTask &bo_task) { ar & bo_task.args; } +template +void serialize(A &ar, ViolationInfo &info) { + ar & info.target_id; + ar & info.violation; + ar & info.violation_size; +} + namespace api { template #ifndef THALLIUM_USE_CEREAL diff --git a/src/utils.cc b/src/utils.cc index b3977c000..216818b5d 100644 --- a/src/utils.cc +++ b/src/utils.cc @@ -120,6 +120,11 @@ void InitDefaultConfig(Config *config) { config->bo_num_threads = 4; config->default_rr_split = false; + + for (int i = 0; i < config->num_devices; ++i) { + config->bo_capacity_thresholds[i].min = 0.0f; + config->bo_capacity_thresholds[i].max = 1.0f; + } } void FailedLibraryCall(std::string func) { diff --git a/test/bucket_test.cc b/test/bucket_test.cc index aa9f90e92..484ad82ee 100644 --- a/test/bucket_test.cc +++ b/test/bucket_test.cc @@ -31,10 +31,10 @@ int compress_blob(HermesPtr hermes, hapi::TraitInput &input, hapi::Trait *trait); struct MyTrait : public hapi::Trait { int compress_level; - MyTrait() : Trait(10001, hermes::TraitIdArray(), hapi::TraitType::META) { - onLinkFn = - std::bind(&compress_blob, std::placeholders::_1, std::placeholders::_2, - std::placeholders::_3); + MyTrait() : hapi::Trait(10001, std::vector(), + hapi::TraitType::META) { + onLinkFn = std::bind(&compress_blob, std::placeholders::_1, + std::placeholders::_2, std::placeholders::_3); } // optional function pointer if only known at runtime diff --git a/test/buffer_organizer_test.cc b/test/buffer_organizer_test.cc index 882a7b13c..8b54f6ad4 100644 --- a/test/buffer_organizer_test.cc +++ b/test/buffer_organizer_test.cc @@ -25,6 +25,7 @@ namespace hapi = hermes::api; using HermesPtr = std::shared_ptr; using hermes::u8; using hermes::f32; +using hermes::u64; using hermes::SharedMemoryContext; using hermes::RpcContext; using hermes::BoTask; @@ -33,7 +34,8 @@ using hermes::TargetID; using hermes::BucketID; using hermes::BlobID; using hermes::BufferInfo; - +using hapi::VBucket; +using hapi::Bucket; static void TestIsBoFunction() { using hermes::IsBoFunction; @@ -243,6 +245,127 @@ void TestOrganizeBlob() { hermes->Finalize(true); } +static void TestWriteOnlyBucket() { + HermesPtr hermes = hermes::InitHermesDaemon(getenv("HERMES_CONF")); + std::string bkt_name = "WriteOnly"; + VBucket vbkt(bkt_name, hermes); + Bucket bkt(bkt_name, hermes); + + hapi::WriteOnlyTrait trait; + vbkt.Attach(&trait); + + const size_t kBlobSize = KILOBYTES(4); + hapi::Blob blob(kBlobSize); + std::iota(blob.begin(), blob.end(), 0); + + const int kIters = 128; + for (int i = 0; i < kIters; ++i) { + std::string blob_name = "b" + std::to_string(i); + bkt.Put(blob_name, blob); + vbkt.Link(blob_name, bkt_name); + } + + vbkt.Destroy(); + bkt.Destroy(); + hermes->Finalize(true); +} + +void TestMinThresholdViolation() { + hermes::Config config = {}; + InitDefaultConfig(&config); + + size_t cap = MEGABYTES(1); + config.capacities[0] = cap; + config.capacities[1] = cap; + config.capacities[2] = cap; + config.capacities[3] = cap; + + for (int i = 0; i < config.num_devices; ++i) { + config.num_slabs[i] = 1; + config.desired_slab_percentages[i][0] = 1.0; + } + + f32 min = 0.25f; + f32 max = 0.75f; + config.bo_capacity_thresholds[0] = {0, max}; + config.bo_capacity_thresholds[1] = {min, max}; + config.bo_capacity_thresholds[2] = {0, max}; + + HermesPtr hermes = hermes::InitHermesDaemon(&config); + + + hermes::RoundRobinState rr_state; + rr_state.SetCurrentDeviceIndex(2); + hapi::Context ctx; + ctx.policy = hapi::PlacementPolicy::kRoundRobin; + Bucket bkt(__func__, hermes, ctx); + // Blob is big enough to exceed minimum capacity of Target 1 + const size_t kBlobSize = (min * cap) + KILOBYTES(4); + hapi::Blob blob(kBlobSize, 'q'); + Assert(bkt.Put("1", blob).Succeeded()); + + // Let the BORG run. It should move enough data from Target 2 to Target 1 to + // fill > the minimum capacity threshold + std::this_thread::sleep_for(std::chrono::seconds(2)); + + // Check remaining capacities + std::vector targets = {{1, 1, 1}, {1, 2, 2}}; + std::vector capacities = + GetRemainingTargetCapacities(&hermes->context_, &hermes->rpc_, targets); + Assert(capacities[0] == cap - kBlobSize + KILOBYTES(4)); + Assert(capacities[1] == cap - KILOBYTES(4)); + + bkt.Destroy(); + hermes->Finalize(true); +} + +void TestMaxThresholdViolation() { + hermes::Config config = {}; + InitDefaultConfig(&config); + + size_t cap = MEGABYTES(1); + config.capacities[0] = cap; + config.capacities[1] = cap; + config.capacities[2] = cap; + config.capacities[3] = cap; + + for (int i = 0; i < config.num_devices; ++i) { + config.num_slabs[i] = 1; + config.desired_slab_percentages[i][0] = 1.0; + } + + f32 min = 0.0f; + f32 max = 0.75f; + config.bo_capacity_thresholds[0] = {min, max}; + config.bo_capacity_thresholds[1] = {min, max}; + config.bo_capacity_thresholds[2] = {min, max}; + + HermesPtr hermes = hermes::InitHermesDaemon(&config); + + hermes::RoundRobinState rr_state; + rr_state.SetCurrentDeviceIndex(1); + hapi::Context ctx; + ctx.policy = hapi::PlacementPolicy::kRoundRobin; + Bucket bkt(__func__, hermes, ctx); + // Exceed maximum capacity of Target 1 by 4KiB + const size_t kBlobSize = (max * MEGABYTES(1)) + KILOBYTES(4); + hapi::Blob blob(kBlobSize, 'q'); + Assert(bkt.Put("1", blob).Succeeded()); + + // Let the BORG run. It should move 4KiB from Target 1 to 2 + std::this_thread::sleep_for(std::chrono::seconds(2)); + + // Check remaining capacities + std::vector targets = {{1, 1, 1}, {1, 2, 2}}; + std::vector capacities = + GetRemainingTargetCapacities(&hermes->context_, &hermes->rpc_, targets); + Assert(capacities[0] == cap - kBlobSize + KILOBYTES(4)); + Assert(capacities[1] == cap - KILOBYTES(4)); + + bkt.Destroy(); + hermes->Finalize(true); +} + int main(int argc, char *argv[]) { int mpi_threads_provided; MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &mpi_threads_provided); @@ -251,10 +374,13 @@ int main(int argc, char *argv[]) { return 1; } - TestIsBoFunction(); - TestBackgroundFlush(); - TestBoMove(); - TestOrganizeBlob(); + HERMES_ADD_TEST(TestIsBoFunction); + HERMES_ADD_TEST(TestBackgroundFlush); + HERMES_ADD_TEST(TestBoMove); + HERMES_ADD_TEST(TestOrganizeBlob); + HERMES_ADD_TEST(TestWriteOnlyBucket); + HERMES_ADD_TEST(TestMinThresholdViolation); + HERMES_ADD_TEST(TestMaxThresholdViolation); MPI_Finalize(); diff --git a/test/config_parser_test.cc b/test/config_parser_test.cc index b3f7fdb1c..1eec11a24 100644 --- a/test/config_parser_test.cc +++ b/test/config_parser_test.cc @@ -223,6 +223,12 @@ void TestDefaultConfig(Arena *arena, const char *config_file) { Assert(config.is_shared_device[2] == 0); Assert(config.is_shared_device[3] == 0); + + for (int i = 0; i < config.num_devices; ++i) { + Assert(config.bo_capacity_thresholds[i].min == 0.0f); + Assert(config.bo_capacity_thresholds[i].max == 1.0f); + } + Assert(config.bo_num_threads == 4); } diff --git a/test/data/hermes.conf b/test/data/hermes.conf index 286d0b269..5bde5ca8d 100644 --- a/test/data/hermes.conf +++ b/test/data/hermes.conf @@ -1,7 +1,5 @@ # Example Hermes configuration file -# TODO(chogan): Allow specifying capacity values in bytes, KiB, or GiB. - # The number of buffering tiers available. For example, RAM, NVMe, burst # buffer, and parallel file system would be 4 tiers. num_devices = 4; @@ -114,3 +112,19 @@ default_placement_policy = "MinimizeIoTime"; # If true (1) the RoundRobin placement policy algorithm will split each Blob # into a random number of smaller Blobs. default_rr_split = 0; + +# For each device, the minimum and maximum percent capacity threshold at which +# the BufferOrganizer will trigger. Decreasing the maximum thresholds will cause +# the BufferOrganizer to move data to lower devices, making more room in faster +# devices (ideal for write-heavy workloads). Conversely, increasing the minimum +# threshold will cause data to be moved from slower devices into faster devices +# (ideal for read-heavy workloads). For example, a maximum capacity threshold of +# 0.8 would have the effect of always keeping 20% of the device's space free for +# incoming writes. Conversely, a minimum capacity threshold of 0.3 would ensure +# that the device is always at least 30% occupied. +bo_capacity_thresholds = { + {0.0, 1.0}, + {0.0, 1.0}, + {0.0, 1.0}, + {0.0, 1.0}, +}; \ No newline at end of file diff --git a/test/dpe_optimization_test.cc b/test/dpe_optimization_test.cc index 5ae476af4..b650b9e5d 100644 --- a/test/dpe_optimization_test.cc +++ b/test/dpe_optimization_test.cc @@ -30,10 +30,12 @@ void MinimizeIoTimePlaceBlob(std::vector &blob_sizes, << blob_sizes[0] << " to targets\n" << std::flush; std::vector targets = testing::GetDefaultTargets(node_state.num_devices); + api::Context ctx; + ctx.minimize_io_time_options = api::MinimizeIoTimeOptions(0, 0, true); Status result = MinimizeIoTimePlacement(blob_sizes, node_state.bytes_available, node_state.bandwidth, targets, - schemas_tmp); + schemas_tmp, ctx); if (result.Failed()) { std::cout << "\nMinimizeIoTimePlacement failed\n" << std::flush; exit(1); diff --git a/test/mdm_test.cc b/test/mdm_test.cc index 3706a9070..18bfce2ed 100644 --- a/test/mdm_test.cc +++ b/test/mdm_test.cc @@ -17,7 +17,9 @@ #include "hermes.h" #include "bucket.h" #include "vbucket.h" +#include "buffer_pool_internal.h" #include "metadata_management_internal.h" +#include "metadata_storage.h" #include "test_utils.h" using namespace hermes; // NOLINT(*) @@ -380,6 +382,50 @@ static void TestMdmViz() { hermes->Finalize(true); } +static void TestEffectiveTarget() { + using namespace hermes; // NOLINT(*) + + hermes::Config config = {}; + hermes::InitDefaultConfig(&config); + config.default_placement_policy = hapi::PlacementPolicy::kRoundRobin; + config.default_rr_split = 0; + HermesPtr hermes = hermes::InitHermesDaemon(&config); + + hermes::RoundRobinState rr_state; + rr_state.SetCurrentDeviceIndex(0); + + std::string bucket_name(__func__); + hapi::Bucket bucket(bucket_name, hermes); + hapi::Blob data(4 * 1024, 'z'); + std::string blob_name("1"); + Assert(bucket.Put(blob_name, data).Succeeded()); + + SharedMemoryContext *context = &hermes->context_; + RpcContext *rpc = &hermes->rpc_; + MetadataManager *mdm = GetMetadataManagerFromContext(context); + + // Check BlobInfo::effective_target + BucketID bucket_id = GetBucketId(context, rpc, bucket_name.c_str()); + BlobID blob_id = GetBlobId(context, rpc, blob_name, bucket_id, false); + BlobInfo *info = GetBlobInfoPtr(mdm, blob_id); + TargetID expected_target_id = {{1, 0, 0}}; + Assert(info->effective_target.as_int == expected_target_id.as_int); + ReleaseBlobInfoPtr(mdm); + + // Check Target::effective_blobs + Target *ram_target = GetTarget(context, 0); + Assert(ram_target->effective_blobs.length == 1); + u64 *ids = GetIdsPtr(mdm, ram_target->effective_blobs); + BlobID effective_blob_id = {}; + effective_blob_id.as_int = ids[0]; + Assert(effective_blob_id.as_int == blob_id.as_int); + ReleaseIdsPtr(mdm); + + bucket.Destroy(); + + hermes->Finalize(true); +} + int main(int argc, char **argv) { int mpi_threads_provided; MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &mpi_threads_provided); @@ -390,24 +436,25 @@ int main(int argc, char **argv) { HermesPtr hermes = hapi::InitHermes(NULL, true); - TestNullIds(); - TestGetMapMutex(); - TestLocalGetNextFreeBucketId(hermes); - TestGetOrCreateBucketId(hermes); - TestRenameBlob(hermes); - TestRenameBucket(hermes); - TestBucketRefCounting(hermes); - TestMaxNameLength(hermes); - TestGetRelativeNodeId(); - TestDuplicateBlobNames(hermes); - TestGetBucketIdFromBlobId(hermes); - TestHexStringToU64(); + HERMES_ADD_TEST(TestNullIds); + HERMES_ADD_TEST(TestGetMapMutex); + HERMES_ADD_TEST(TestLocalGetNextFreeBucketId, hermes); + HERMES_ADD_TEST(TestGetOrCreateBucketId, hermes); + HERMES_ADD_TEST(TestRenameBlob, hermes); + HERMES_ADD_TEST(TestRenameBucket, hermes); + HERMES_ADD_TEST(TestBucketRefCounting, hermes); + HERMES_ADD_TEST(TestMaxNameLength, hermes); + HERMES_ADD_TEST(TestGetRelativeNodeId); + HERMES_ADD_TEST(TestDuplicateBlobNames, hermes); + HERMES_ADD_TEST(TestGetBucketIdFromBlobId, hermes); + HERMES_ADD_TEST(TestHexStringToU64); hermes->Finalize(true); - TestSwapBlobsExistInBucket(); - TestBlobInfoMap(); - TestMdmViz(); + HERMES_ADD_TEST(TestSwapBlobsExistInBucket); + HERMES_ADD_TEST(TestBlobInfoMap); + HERMES_ADD_TEST(TestMdmViz); + HERMES_ADD_TEST(TestEffectiveTarget); MPI_Finalize(); diff --git a/test/test_utils.h b/test/test_utils.h index 906a508db..8a2ea063e 100644 --- a/test/test_utils.h +++ b/test/test_utils.h @@ -24,6 +24,12 @@ #include "hermes_types.h" #include "bucket.h" +#define HERMES_ADD_TEST(test_name, ...) \ + if (argc == 1 || std::string(argv[1]) == #test_name) { \ + fprintf(stdout, "### Running %s\n", #test_name); \ + test_name(__VA_ARGS__); \ + } + namespace hermes { namespace testing { @@ -43,6 +49,9 @@ class Timer { double getElapsedTime() { return elapsed_time; } + void reset() { + elapsed_time = 0; + } private: std::chrono::high_resolution_clock::time_point t1; double elapsed_time;