Skip to content

Commit

Permalink
Merge 35a5026 into 0cbcb69
Browse files Browse the repository at this point in the history
  • Loading branch information
ChristopherHogan committed Jan 21, 2021
2 parents 0cbcb69 + 35a5026 commit f33fbcc
Show file tree
Hide file tree
Showing 9 changed files with 130 additions and 19 deletions.
2 changes: 1 addition & 1 deletion src/api/bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ Status Bucket::PlaceBlobs(std::vector<PlacementSchema> &schemas,
LOG(INFO) << "Attaching blob '" << names[i] << "' to Bucket '" << name_
<< "'" << std::endl;
result = PlaceBlob(&hermes_->context_, &hermes_->rpc_, schema, blob,
names[i].c_str(), id_, retries);
names[i], id_, retries);
}

return result;
Expand Down
16 changes: 12 additions & 4 deletions src/buffer_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,8 @@ void SetFirstFreeBufferId(SharedMemoryContext *context, DeviceID device_id,
}
}

static std::atomic<u32> *GetAvailableBuffersArray(SharedMemoryContext *context,
DeviceID device_id) {
std::atomic<u32> *GetAvailableBuffersArray(SharedMemoryContext *context,
DeviceID device_id) {
BufferPool *pool = GetBufferPoolFromContext(context);
std::atomic<u32> *result =
(std::atomic<u32> *)(context->shm_base +
Expand Down Expand Up @@ -1594,10 +1594,18 @@ size_t ReadFromSwap(SharedMemoryContext *context, Blob blob,
}

Status PlaceBlob(SharedMemoryContext *context, RpcContext *rpc,
PlacementSchema &schema, Blob blob, const char *name,
PlacementSchema &schema, Blob blob, const std::string &name,
BucketID bucket_id, int retries,
bool called_from_buffer_organizer) {
Status result = 0;

if (ContainsBlob(context, rpc, bucket_id, name)) {
// TODO(chogan) @optimization If the existing buffers are already large
// enough to hold the new Blob, then we don't need to release them.
// Additionally, no metadata operations would be required.
DestroyBlobByName(context, rpc, bucket_id, name);
}

HERMES_BEGIN_TIMED_BLOCK("GetBuffers");
std::vector<BufferID> buffer_ids = GetBuffers(context, schema);
HERMES_END_TIMED_BLOCK();
Expand All @@ -1608,7 +1616,7 @@ Status PlaceBlob(SharedMemoryContext *context, RpcContext *rpc,
HERMES_END_TIMED_BLOCK();

// NOTE(chogan): Update all metadata associated with this Put
AttachBlobToBucket(context, rpc, name, bucket_id, buffer_ids);
AttachBlobToBucket(context, rpc, name.c_str(), bucket_id, buffer_ids);
} else {
if (called_from_buffer_organizer) {
// TODO(chogan): @errorhandling The BufferOrganizer failed to place a blob
Expand Down
4 changes: 2 additions & 2 deletions src/buffer_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -472,8 +472,8 @@ bool BufferIsByteAddressable(SharedMemoryContext *context, BufferID id);
int PlaceInHierarchy(SharedMemoryContext *context, RpcContext *rpc,
SwapBlob swap_blob, const std::string &blob_name);
api::Status PlaceBlob(SharedMemoryContext *context, RpcContext *rpc,
PlacementSchema &schema, Blob blob, const char *name,
BucketID bucket_id, int retries,
PlacementSchema &schema, Blob blob,
const std::string &name, BucketID bucket_id, int retries,
bool called_from_buffer_organizer = false);
api::Status StdIoPersistBucket(SharedMemoryContext *context, RpcContext *rpc,
Arena *arena, BucketID bucket_id,
Expand Down
5 changes: 5 additions & 0 deletions src/buffer_pool_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,11 @@ Target *GetTarget(SharedMemoryContext *context, int index);
*/
Target *GetTargetFromId(SharedMemoryContext *context, TargetID id);

/**
*
*/
std::atomic<u32> *GetAvailableBuffersArray(SharedMemoryContext *context,
DeviceID device_id);
} // namespace hermes

#endif // HERMES_BUFFER_POOL_INTERNAL_H_
14 changes: 8 additions & 6 deletions src/metadata_management.cc
Original file line number Diff line number Diff line change
Expand Up @@ -562,12 +562,14 @@ bool ContainsBlob(SharedMemoryContext *context, RpcContext *rpc,
BlobID blob_id = GetBlobIdByName(context, rpc, blob_name.c_str());
bool result = false;

u32 target_node = bucket_id.bits.node_id;
if (target_node == rpc->node_id) {
result = LocalContainsBlob(context, bucket_id, blob_id);
} else {
result = RpcCall<bool>(rpc, target_node, "RemoteContainsBlob", bucket_id,
blob_name);
if (!IsNullBlobId(blob_id)) {
u32 target_node = bucket_id.bits.node_id;
if (target_node == rpc->node_id) {
result = LocalContainsBlob(context, bucket_id, blob_id);
} else {
result = RpcCall<bool>(rpc, target_node, "RemoteContainsBlob", bucket_id,
blob_name);
}
}

return result;
Expand Down
14 changes: 11 additions & 3 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -81,21 +81,29 @@ target_compile_definitions(bp_test
#------------------------------------------------------------------------------

add_executable(config_parser_test config_parser_test.cc)
target_link_libraries(config_parser_test hermes)
target_link_libraries(config_parser_test hermes
$<$<BOOL:${HERMES_RPC_THALLIUM}>:thallium>)

add_test(NAME TestConfigParser
COMMAND config_parser_test ${CMAKE_CURRENT_SOURCE_DIR}/data/hermes.conf)
target_compile_definitions(config_parser_test
PRIVATE $<$<BOOL:${HERMES_RPC_THALLIUM}>:HERMES_RPC_THALLIUM>)

#------------------------------------------------------------------------------
# Memory Management tests
#------------------------------------------------------------------------------

add_executable(mem memory_test.cc)
target_link_libraries(mem hermes)
target_link_libraries(mem hermes $<$<BOOL:${HERMES_RPC_THALLIUM}>:thallium>)
add_test(NAME TestMemoryManagement COMMAND mem)
target_compile_definitions(mem
PRIVATE $<$<BOOL:${HERMES_RPC_THALLIUM}>:HERMES_RPC_THALLIUM>)

add_executable(stb_map stb_map_test.cc)
target_link_libraries(stb_map hermes ${LIBRT} MPI::MPI_CXX)
target_link_libraries(stb_map hermes ${LIBRT}
$<$<BOOL:${HERMES_RPC_THALLIUM}>:thallium> MPI::MPI_CXX)
target_compile_definitions(stb_map
PRIVATE $<$<BOOL:${HERMES_RPC_THALLIUM}>:HERMES_RPC_THALLIUM>)
add_test(NAME TestSTBMapWithHeap COMMAND stb_map)

#------------------------------------------------------------------------------
Expand Down
23 changes: 23 additions & 0 deletions test/bucket_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,28 @@ void TestBucketPersist(std::shared_ptr<hapi::Hermes> hermes) {
Assert(std::remove(saved_file.c_str()) == 0);
}

void TestPutOverwrite(std::shared_ptr<hapi::Hermes> hermes) {
hapi::Context ctx;
hapi::Bucket bucket("overwrite", hermes, ctx);

std::string blob_name("1");
size_t blob_size = KILOBYTES(6);
hapi::Blob blob(blob_size, 'x');
hapi::Status status = bucket.Put(blob_name, blob, ctx);
Assert(status == 0);

hermes::testing::GetAndVerifyBlob(bucket, blob_name, blob);

// NOTE(chogan): Overwrite the data
size_t new_size = KILOBYTES(9);
hapi::Blob new_blob(new_size, 'z');
status = bucket.Put(blob_name, new_blob, ctx);

hermes::testing::GetAndVerifyBlob(bucket, blob_name, new_blob);

bucket.Destroy(ctx);
}

int main(int argc, char **argv) {
int mpi_threads_provided;
MPI_Init_thread(NULL, NULL, MPI_THREAD_MULTIPLE, &mpi_threads_provided);
Expand Down Expand Up @@ -176,6 +198,7 @@ int main(int argc, char **argv) {
my_vb.Attach(&trait, ctx); // compress action to data starts

TestBucketPersist(hermes_app);
TestPutOverwrite(hermes_app);

///////
my_vb.Unlink("Blob1", "VB1", ctx);
Expand Down
53 changes: 52 additions & 1 deletion test/buffer_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
* node.
*/

using hermes::api::Hermes;
namespace hapi = hermes::api;
using hapi::Hermes;

void TestGetBuffers(Hermes *hermes) {
using namespace hermes; // NOLINT(*)
Expand Down Expand Up @@ -90,6 +90,55 @@ hapi::Status ForceBlobToSwap(Hermes *hermes, hermes::u64 id, hapi::Blob &blob,
return result;
}

/**
* Fills out @p config to represent one `Device` (RAM) with 2, 4 KB buffers.
*/
void MakeTwoBufferRAMConfig(hermes::Config *config) {
InitDefaultConfig(config);
config->num_devices = 1;
config->num_targets = 1;
config->capacities[0] = KILOBYTES(36);
config->desired_slab_percentages[0][0] = 1;
config->desired_slab_percentages[0][1] = 0;
config->desired_slab_percentages[0][2] = 0;
config->desired_slab_percentages[0][3] = 0;
config->arena_percentages[hermes::kArenaType_BufferPool] = 0.5;
config->arena_percentages[hermes::kArenaType_MetaData] = 0.5;
}

void TestBlobOverwrite() {
using namespace hermes; // NOLINT(*)
Config config = {};
MakeTwoBufferRAMConfig(&config);
std::shared_ptr<Hermes> hermes = hermes::InitHermesDaemon(&config);
SharedMemoryContext *context = &hermes->context_;
DeviceID ram_id = 0;
int slab_index = 0;
std::atomic<u32> *buffers_available =
GetAvailableBuffersArray(context, ram_id);
Assert(buffers_available[slab_index] == 2);

hapi::Context ctx;
ctx.policy = hapi::PlacementPolicy::kRandom;
hapi::Bucket bucket("overwrite", hermes, ctx);

std::string blob_name("1");
size_t blob_size = KILOBYTES(2);
hapi::Blob blob(blob_size, '1');
hapi::Status status = bucket.Put(blob_name, blob, ctx);
Assert(status == 0);

Assert(buffers_available[slab_index] == 1);

// NOTE(chogan): Overwrite the data
hapi::Blob new_blob(blob_size, '2');
status = bucket.Put(blob_name, new_blob, ctx);

Assert(buffers_available[slab_index] == 1);

hermes->Finalize(true);
}

void TestSwap(std::shared_ptr<Hermes> hermes) {
hapi::Context ctx;
ctx.policy = hapi::PlacementPolicy::kRandom;
Expand Down Expand Up @@ -216,6 +265,8 @@ int main(int argc, char **argv) {
TestGetBuffers(hermes.get());
TestGetBandwidths(&hermes->context_);
hermes->Finalize(true);

TestBlobOverwrite();
}

if (test_swap) {
Expand Down
18 changes: 16 additions & 2 deletions test/test_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <map>

#include "hermes_types.h"
#include "bucket.h"

namespace hermes {
namespace testing {
Expand Down Expand Up @@ -43,6 +44,8 @@ void Assert(bool expr, const char *file, int lineno, const char *message) {
}
}

#define Assert(expr) hermes::testing::Assert((expr), __FILE__, __LINE__, #expr)

struct TargetViewState {
std::vector<hermes::u64> bytes_capacity;
std::vector<hermes::u64> bytes_available;
Expand Down Expand Up @@ -143,9 +146,20 @@ void PrintNodeState(TargetViewState &node_state) {
}
}

void GetAndVerifyBlob(api::Bucket &bucket, const std::string &blob_name,
const api::Blob &expected) {
api::Context ctx;
api::Blob retrieved_blob;
size_t expected_size = expected.size();
size_t retrieved_size = bucket.Get(blob_name, retrieved_blob, ctx);
Assert(expected_size == retrieved_size);
retrieved_blob.resize(retrieved_size);
retrieved_size = bucket.Get(blob_name, retrieved_blob, ctx);
Assert(expected_size == retrieved_size);
Assert(retrieved_blob == expected);
}

} // namespace testing
} // namespace hermes

#define Assert(expr) hermes::testing::Assert((expr), __FILE__, __LINE__, #expr)

#endif // HERMES_TEST_UTILS_H_

0 comments on commit f33fbcc

Please sign in to comment.