Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send Blobs to swap space on GetBuffers failure #69

Merged
merged 1 commit into from
Jan 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions src/api/bucket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,10 @@ Status Bucket::Put(const std::string &name, const u8 *data, size_t size,
for (size_t i = 0; i < size; ++i) {
blobs[0][i] = data[i];
}
ret = PlaceBlobs(schemas, blobs, names);
ret = PlaceBlobs(schemas, blobs, names, ctx.buffer_organizer_retries);
} else {
SwapBlob swap_blob = PutToSwap(&hermes_->context_, &hermes_->rpc_, name,
id_, data, size);
TriggerBufferOrganizer(&hermes_->rpc_, kPlaceInHierarchy, name, swap_blob,
ctx.buffer_organizer_retries);
ret = 0;
// TODO(chogan): @errorhandling Signify in Status that the Blob went to
// swap space
// TODO(chogan): @errorhandling No space left or contraints unsatisfiable.
ret = 1;
}
}

Expand Down
21 changes: 6 additions & 15 deletions src/api/bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class Bucket {
template<typename T>
Status PlaceBlobs(std::vector<PlacementSchema> &schemas,
const std::vector<std::vector<T>> &blobs,
const std::vector<std::string> &names);
const std::vector<std::string> &names, int retries);

/**
*
Expand Down Expand Up @@ -144,7 +144,7 @@ Status Bucket::Put(const std::string &name, const std::vector<T> &data,
template<typename T>
Status Bucket::PlaceBlobs(std::vector<PlacementSchema> &schemas,
const std::vector<std::vector<T>> &blobs,
const std::vector<std::string> &names) {
const std::vector<std::string> &names, int retries) {
Status result = 0;

for (size_t i = 0; i < schemas.size(); ++i) {
Expand All @@ -154,9 +154,8 @@ Status Bucket::PlaceBlobs(std::vector<PlacementSchema> &schemas,
blob.size = blobs[i].size() * sizeof(T);
LOG(INFO) << "Attaching blob '" << names[i] << "' to Bucket '" << name_
<< "'" << std::endl;
// TODO(chogan): @errorhandling What about partial failure?
result = PlaceBlob(&hermes_->context_, &hermes_->rpc_, schema, blob,
names[i].c_str(), id_);
names[i].c_str(), id_, retries);
}

return result;
Expand Down Expand Up @@ -188,18 +187,10 @@ Status Bucket::Put(std::vector<std::string> &names,
HERMES_END_TIMED_BLOCK();

if (ret == 0) {
ret = PlaceBlobs(schemas, blobs, names);
ret = PlaceBlobs(schemas, blobs, names, ctx.buffer_organizer_retries);
} else {
std::vector<SwapBlob> swapped_blobs =
PutToSwap(&hermes_->context_, &hermes_->rpc_, id_, blobs, names);

for (size_t i = 0; i < swapped_blobs.size(); ++i) {
TriggerBufferOrganizer(&hermes_->rpc_, kPlaceInHierarchy, names[i],
swapped_blobs[i], ctx.buffer_organizer_retries);
}
ret = 0;
// TODO(chogan): @errorhandling Signify in Status that the Blobs went to
// swap space
// TODO(chogan): @errorhandling No space left or contraints unsatisfiable.
ret = 1;
}
} else {
// TODO(chogan): @errorhandling
Expand Down
2 changes: 1 addition & 1 deletion src/buffer_organizer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ int PlaceInHierarchy(SharedMemoryContext *context, RpcContext *rpc,
}

ret = PlaceBlob(context, rpc, schemas[0], blob, name.c_str(),
swap_blob.bucket_id);
swap_blob.bucket_id, 0, true);
} else {
// TODO(chogan): @errorhandling
result = 1;
Expand Down
14 changes: 11 additions & 3 deletions src/buffer_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1595,7 +1595,8 @@ size_t ReadFromSwap(SharedMemoryContext *context, Blob blob,

Status PlaceBlob(SharedMemoryContext *context, RpcContext *rpc,
PlacementSchema &schema, Blob blob, const char *name,
BucketID bucket_id) {
BucketID bucket_id, int retries,
bool called_from_buffer_organizer) {
Status result = 0;
HERMES_BEGIN_TIMED_BLOCK("GetBuffers");
std::vector<BufferID> buffer_ids = GetBuffers(context, schema);
Expand All @@ -1609,8 +1610,15 @@ Status PlaceBlob(SharedMemoryContext *context, RpcContext *rpc,
// NOTE(chogan): Update all metadata associated with this Put
AttachBlobToBucket(context, rpc, name, bucket_id, buffer_ids);
} else {
// TODO(chogan): @errorhandling
result = 1;
if (called_from_buffer_organizer) {
// TODO(chogan): @errorhandling The BufferOrganizer failed to place a blob
// from swap space into the hierarchy.
result = 1;
} else {
SwapBlob swap_blob = PutToSwap(context, rpc, name, bucket_id, blob.data,
blob.size);
TriggerBufferOrganizer(rpc, kPlaceInHierarchy, name, swap_blob, retries);
}
}

return result;
Expand Down
3 changes: 2 additions & 1 deletion src/buffer_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,8 @@ int PlaceInHierarchy(SharedMemoryContext *context, RpcContext *rpc,
SwapBlob swap_blob, const std::string &blob_name);
api::Status PlaceBlob(SharedMemoryContext *context, RpcContext *rpc,
PlacementSchema &schema, Blob blob, const char *name,
BucketID bucket_id);
BucketID bucket_id, int retries,
bool called_from_buffer_organizer = false);
api::Status StdIoPersistBucket(SharedMemoryContext *context, RpcContext *rpc,
Arena *arena, BucketID bucket_id,
const std::string &file_name,
Expand Down
39 changes: 30 additions & 9 deletions test/buffer_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
*/

using hermes::api::Hermes;
namespace hapi = hermes::api;

void TestGetBuffers(Hermes *hermes) {
using namespace hermes; // NOLINT(*)
Expand Down Expand Up @@ -72,15 +73,34 @@ void TestGetBandwidths(hermes::SharedMemoryContext *context) {
}
}

hapi::Status ForceBlobToSwap(Hermes *hermes, hermes::u64 id, hapi::Blob &blob,
const char *blob_name) {
using namespace hermes; // NOLINT(*)
PlacementSchema schema;
schema.push_back({blob.size(), testing::DefaultRamTargetId()});
Blob internal_blob = {};
internal_blob.data = blob.data();
internal_blob.size = blob.size();
hermes::BucketID bucket_id = {};
bucket_id.as_int = id;
int retries = 3;
hapi::Status result = PlaceBlob(&hermes->context_, &hermes->rpc_, schema,
internal_blob, blob_name, bucket_id, retries);

return result;
}

void TestSwap(std::shared_ptr<Hermes> hermes) {
namespace hapi = hermes::api;
hapi::Context ctx;
ctx.policy = hapi::PlacementPolicy::kRandom;
hapi::Bucket bucket(std::string("swap_bucket"), hermes, ctx);
hapi::Blob data(MEGABYTES(1), 'x');
size_t data_size = MEGABYTES(1);
hapi::Blob data(data_size, 'x');
std::string blob_name("swap_blob");
bucket.Put(blob_name, data, ctx);

hapi::Status status = ForceBlobToSwap(hermes.get(), bucket.GetId(), data,
blob_name.c_str());
Assert(status == 0);
// NOTE(chogan): The Blob is in the swap space, but the API behaves as normal.
Assert(bucket.ContainsBlob(blob_name));

hapi::Blob get_result;
Expand All @@ -94,22 +114,23 @@ void TestSwap(std::shared_ptr<Hermes> hermes) {
}

void TestBufferOrganizer(std::shared_ptr<Hermes> hermes) {
namespace hapi = hermes::api;
hapi::Context ctx;
ctx.policy = hapi::PlacementPolicy::kRandom;
hapi::Bucket bucket(std::string("bo_bucket"), hermes, ctx);

// NOTE(chogan): Fill our single buffer with a blob.
hapi::Blob data1(KILOBYTES(4), 'x');
std::string blob1_name("bo_blob1");
bucket.Put(blob1_name, data1, ctx);
hapi::Status status = bucket.Put(blob1_name, data1, ctx);
Assert(status == 0);
Assert(bucket.ContainsBlob(blob1_name));

// NOTE(chogan): Try to put another blob, which will go to the swap space
// since the hierarchy is full.
// NOTE(chogan): Force a second Blob to the swap space.
hapi::Blob data2(KILOBYTES(4), 'y');
std::string blob2_name("bo_blob2");
bucket.Put(blob2_name, data2, ctx);
status = ForceBlobToSwap(hermes.get(), bucket.GetId(), data2,
blob2_name.c_str());
Assert(status == 0);
Assert(bucket.BlobIsInSwap(blob2_name));

// NOTE(chogan): Delete the first blob, which will make room for the second,
Expand Down