Skip to content

Commit

Permalink
Send Blobs to swap space on GetBuffers failure
Browse files Browse the repository at this point in the history
  • Loading branch information
ChristopherHogan committed Jan 20, 2021
1 parent b25af17 commit db3dfab
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 37 deletions.
11 changes: 3 additions & 8 deletions src/api/bucket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,10 @@ Status Bucket::Put(const std::string &name, const u8 *data, size_t size,
for (size_t i = 0; i < size; ++i) {
blobs[0][i] = data[i];
}
ret = PlaceBlobs(schemas, blobs, names);
ret = PlaceBlobs(schemas, blobs, names, ctx.buffer_organizer_retries);
} else {
SwapBlob swap_blob = PutToSwap(&hermes_->context_, &hermes_->rpc_, name,
id_, data, size);
TriggerBufferOrganizer(&hermes_->rpc_, kPlaceInHierarchy, name, swap_blob,
ctx.buffer_organizer_retries);
ret = 0;
// TODO(chogan): @errorhandling Signify in Status that the Blob went to
// swap space
// TODO(chogan): @errorhandling No space left or contraints unsatisfiable.
ret = 1;
}
}

Expand Down
21 changes: 6 additions & 15 deletions src/api/bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class Bucket {
template<typename T>
Status PlaceBlobs(std::vector<PlacementSchema> &schemas,
const std::vector<std::vector<T>> &blobs,
const std::vector<std::string> &names);
const std::vector<std::string> &names, int retries);

/**
*
Expand Down Expand Up @@ -144,7 +144,7 @@ Status Bucket::Put(const std::string &name, const std::vector<T> &data,
template<typename T>
Status Bucket::PlaceBlobs(std::vector<PlacementSchema> &schemas,
const std::vector<std::vector<T>> &blobs,
const std::vector<std::string> &names) {
const std::vector<std::string> &names, int retries) {
Status result = 0;

for (size_t i = 0; i < schemas.size(); ++i) {
Expand All @@ -154,9 +154,8 @@ Status Bucket::PlaceBlobs(std::vector<PlacementSchema> &schemas,
blob.size = blobs[i].size() * sizeof(T);
LOG(INFO) << "Attaching blob '" << names[i] << "' to Bucket '" << name_
<< "'" << std::endl;
// TODO(chogan): @errorhandling What about partial failure?
result = PlaceBlob(&hermes_->context_, &hermes_->rpc_, schema, blob,
names[i].c_str(), id_);
names[i].c_str(), id_, retries);
}

return result;
Expand Down Expand Up @@ -188,18 +187,10 @@ Status Bucket::Put(std::vector<std::string> &names,
HERMES_END_TIMED_BLOCK();

if (ret == 0) {
ret = PlaceBlobs(schemas, blobs, names);
ret = PlaceBlobs(schemas, blobs, names, ctx.buffer_organizer_retries);
} else {
std::vector<SwapBlob> swapped_blobs =
PutToSwap(&hermes_->context_, &hermes_->rpc_, id_, blobs, names);

for (size_t i = 0; i < swapped_blobs.size(); ++i) {
TriggerBufferOrganizer(&hermes_->rpc_, kPlaceInHierarchy, names[i],
swapped_blobs[i], ctx.buffer_organizer_retries);
}
ret = 0;
// TODO(chogan): @errorhandling Signify in Status that the Blobs went to
// swap space
// TODO(chogan): @errorhandling No space left or contraints unsatisfiable.
ret = 1;
}
} else {
// TODO(chogan): @errorhandling
Expand Down
2 changes: 1 addition & 1 deletion src/buffer_organizer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ int PlaceInHierarchy(SharedMemoryContext *context, RpcContext *rpc,
}

ret = PlaceBlob(context, rpc, schemas[0], blob, name.c_str(),
swap_blob.bucket_id);
swap_blob.bucket_id, 0, true);
} else {
// TODO(chogan): @errorhandling
result = 1;
Expand Down
14 changes: 11 additions & 3 deletions src/buffer_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1595,7 +1595,8 @@ size_t ReadFromSwap(SharedMemoryContext *context, Blob blob,

Status PlaceBlob(SharedMemoryContext *context, RpcContext *rpc,
PlacementSchema &schema, Blob blob, const char *name,
BucketID bucket_id) {
BucketID bucket_id, int retries,
bool called_from_buffer_organizer) {
Status result = 0;
HERMES_BEGIN_TIMED_BLOCK("GetBuffers");
std::vector<BufferID> buffer_ids = GetBuffers(context, schema);
Expand All @@ -1609,8 +1610,15 @@ Status PlaceBlob(SharedMemoryContext *context, RpcContext *rpc,
// NOTE(chogan): Update all metadata associated with this Put
AttachBlobToBucket(context, rpc, name, bucket_id, buffer_ids);
} else {
// TODO(chogan): @errorhandling
result = 1;
if (called_from_buffer_organizer) {
// TODO(chogan): @errorhandling The BufferOrganizer failed to place a blob
// from swap space into the hierarchy.
result = 1;
} else {
SwapBlob swap_blob = PutToSwap(context, rpc, name, bucket_id, blob.data,
blob.size);
TriggerBufferOrganizer(rpc, kPlaceInHierarchy, name, swap_blob, retries);
}
}

return result;
Expand Down
3 changes: 2 additions & 1 deletion src/buffer_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,8 @@ int PlaceInHierarchy(SharedMemoryContext *context, RpcContext *rpc,
SwapBlob swap_blob, const std::string &blob_name);
api::Status PlaceBlob(SharedMemoryContext *context, RpcContext *rpc,
PlacementSchema &schema, Blob blob, const char *name,
BucketID bucket_id);
BucketID bucket_id, int retries,
bool called_from_buffer_organizer = false);
api::Status StdIoPersistBucket(SharedMemoryContext *context, RpcContext *rpc,
Arena *arena, BucketID bucket_id,
const std::string &file_name,
Expand Down
39 changes: 30 additions & 9 deletions test/buffer_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
*/

using hermes::api::Hermes;
namespace hapi = hermes::api;

void TestGetBuffers(Hermes *hermes) {
using namespace hermes; // NOLINT(*)
Expand Down Expand Up @@ -72,15 +73,34 @@ void TestGetBandwidths(hermes::SharedMemoryContext *context) {
}
}

hapi::Status ForceBlobToSwap(Hermes *hermes, hermes::u64 id, hapi::Blob &blob,
const char *blob_name) {
using namespace hermes; // NOLINT(*)
PlacementSchema schema;
schema.push_back({blob.size(), testing::DefaultRamTargetId()});
Blob internal_blob = {};
internal_blob.data = blob.data();
internal_blob.size = blob.size();
hermes::BucketID bucket_id = {};
bucket_id.as_int = id;
int retries = 3;
hapi::Status result = PlaceBlob(&hermes->context_, &hermes->rpc_, schema,
internal_blob, blob_name, bucket_id, retries);

return result;
}

void TestSwap(std::shared_ptr<Hermes> hermes) {
namespace hapi = hermes::api;
hapi::Context ctx;
ctx.policy = hapi::PlacementPolicy::kRandom;
hapi::Bucket bucket(std::string("swap_bucket"), hermes, ctx);
hapi::Blob data(MEGABYTES(1), 'x');
size_t data_size = MEGABYTES(1);
hapi::Blob data(data_size, 'x');
std::string blob_name("swap_blob");
bucket.Put(blob_name, data, ctx);

hapi::Status status = ForceBlobToSwap(hermes.get(), bucket.GetId(), data,
blob_name.c_str());
Assert(status == 0);
// NOTE(chogan): The Blob is in the swap space, but the API behaves as normal.
Assert(bucket.ContainsBlob(blob_name));

hapi::Blob get_result;
Expand All @@ -94,22 +114,23 @@ void TestSwap(std::shared_ptr<Hermes> hermes) {
}

void TestBufferOrganizer(std::shared_ptr<Hermes> hermes) {
namespace hapi = hermes::api;
hapi::Context ctx;
ctx.policy = hapi::PlacementPolicy::kRandom;
hapi::Bucket bucket(std::string("bo_bucket"), hermes, ctx);

// NOTE(chogan): Fill our single buffer with a blob.
hapi::Blob data1(KILOBYTES(4), 'x');
std::string blob1_name("bo_blob1");
bucket.Put(blob1_name, data1, ctx);
hapi::Status status = bucket.Put(blob1_name, data1, ctx);
Assert(status == 0);
Assert(bucket.ContainsBlob(blob1_name));

// NOTE(chogan): Try to put another blob, which will go to the swap space
// since the hierarchy is full.
// NOTE(chogan): Force a second Blob to the swap space.
hapi::Blob data2(KILOBYTES(4), 'y');
std::string blob2_name("bo_blob2");
bucket.Put(blob2_name, data2, ctx);
status = ForceBlobToSwap(hermes.get(), bucket.GetId(), data2,
blob2_name.c_str());
Assert(status == 0);
Assert(bucket.BlobIsInSwap(blob2_name));

// NOTE(chogan): Delete the first blob, which will make room for the second,
Expand Down

0 comments on commit db3dfab

Please sign in to comment.