Skip to content

Commit

Permalink
Fix memory leak when overwriting a blob
Browse files Browse the repository at this point in the history
  • Loading branch information
ChristopherHogan committed Jan 19, 2021
1 parent b29f0bd commit 1b08b62
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 15 deletions.
2 changes: 1 addition & 1 deletion src/api/bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ Status Bucket::PlaceBlobs(std::vector<PlacementSchema> &schemas,
<< "'" << std::endl;
// TODO(chogan): @errorhandling What about partial failure?
result = PlaceBlob(&hermes_->context_, &hermes_->rpc_, schema, blob,
names[i].c_str(), id_);
names[i], id_);
}

return result;
Expand Down
14 changes: 11 additions & 3 deletions src/buffer_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ void SetFirstFreeBufferId(SharedMemoryContext *context, DeviceID device_id,
}
}

static std::atomic<u32> *GetAvailableBuffersArray(SharedMemoryContext *context,
std::atomic<u32> *GetAvailableBuffersArray(SharedMemoryContext *context,
DeviceID device_id) {
BufferPool *pool = GetBufferPoolFromContext(context);
std::atomic<u32> *result =
Expand Down Expand Up @@ -1594,9 +1594,17 @@ size_t ReadFromSwap(SharedMemoryContext *context, Blob blob,
}

Status PlaceBlob(SharedMemoryContext *context, RpcContext *rpc,
PlacementSchema &schema, Blob blob, const char *name,
PlacementSchema &schema, Blob blob, const std::string &name,
BucketID bucket_id) {
Status result = 0;

if (ContainsBlob(context, rpc, bucket_id, name)) {
// TODO(chogan) @optimization If the existing buffers are already large
// enough to hold the new Blob, then we don't need to release them.
// Additionally, no metadata operations would be required.
DestroyBlobByName(context, rpc, bucket_id, name);
}

HERMES_BEGIN_TIMED_BLOCK("GetBuffers");
std::vector<BufferID> buffer_ids = GetBuffers(context, schema);
HERMES_END_TIMED_BLOCK();
Expand All @@ -1607,7 +1615,7 @@ Status PlaceBlob(SharedMemoryContext *context, RpcContext *rpc,
HERMES_END_TIMED_BLOCK();

// NOTE(chogan): Update all metadata associated with this Put
AttachBlobToBucket(context, rpc, name, bucket_id, buffer_ids);
AttachBlobToBucket(context, rpc, name.c_str(), bucket_id, buffer_ids);
} else {
// TODO(chogan): @errorhandling
result = 1;
Expand Down
4 changes: 2 additions & 2 deletions src/buffer_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -472,8 +472,8 @@ bool BufferIsByteAddressable(SharedMemoryContext *context, BufferID id);
int PlaceInHierarchy(SharedMemoryContext *context, RpcContext *rpc,
SwapBlob swap_blob, const std::string &blob_name);
api::Status PlaceBlob(SharedMemoryContext *context, RpcContext *rpc,
PlacementSchema &schema, Blob blob, const char *name,
BucketID bucket_id);
PlacementSchema &schema, Blob blob,
const std::string &name, BucketID bucket_id);
api::Status StdIoPersistBucket(SharedMemoryContext *context, RpcContext *rpc,
Arena *arena, BucketID bucket_id,
const std::string &file_name,
Expand Down
5 changes: 5 additions & 0 deletions src/buffer_pool_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,11 @@ Target *GetTarget(SharedMemoryContext *context, int index);
*/
Target *GetTargetFromId(SharedMemoryContext *context, TargetID id);

/**
*
*/
std::atomic<u32> *GetAvailableBuffersArray(SharedMemoryContext *context,
DeviceID device_id);
} // namespace hermes

#endif // HERMES_BUFFER_POOL_INTERNAL_H_
14 changes: 8 additions & 6 deletions src/metadata_management.cc
Original file line number Diff line number Diff line change
Expand Up @@ -562,12 +562,14 @@ bool ContainsBlob(SharedMemoryContext *context, RpcContext *rpc,
BlobID blob_id = GetBlobIdByName(context, rpc, blob_name.c_str());
bool result = false;

u32 target_node = bucket_id.bits.node_id;
if (target_node == rpc->node_id) {
result = LocalContainsBlob(context, bucket_id, blob_id);
} else {
result = RpcCall<bool>(rpc, target_node, "RemoteContainsBlob", bucket_id,
blob_name);
if (!IsNullBlobId(blob_id)) {
u32 target_node = bucket_id.bits.node_id;
if (target_node == rpc->node_id) {
result = LocalContainsBlob(context, bucket_id, blob_id);
} else {
result = RpcCall<bool>(rpc, target_node, "RemoteContainsBlob", bucket_id,
blob_name);
}
}

return result;
Expand Down
56 changes: 53 additions & 3 deletions test/buffer_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
* node.
*/

using hermes::api::Hermes;
namespace hapi = hermes::api;
using hapi::Hermes;

void TestGetBuffers(Hermes *hermes) {
using namespace hermes; // NOLINT(*)
Expand Down Expand Up @@ -72,8 +73,56 @@ void TestGetBandwidths(hermes::SharedMemoryContext *context) {
}
}

/**
* Fills out @p config to represent one `Device` (RAM) with 2, 4 KB buffers.
*/
void MakeTwoBufferRAMConfig(hermes::Config *config) {
InitDefaultConfig(config);
config->num_devices = 1;
config->num_targets = 1;
config->capacities[0] = KILOBYTES(36);
config->desired_slab_percentages[0][0] = 1;
config->desired_slab_percentages[0][1] = 0;
config->desired_slab_percentages[0][2] = 0;
config->desired_slab_percentages[0][3] = 0;
config->arena_percentages[hermes::kArenaType_BufferPool] = 0.5;
config->arena_percentages[hermes::kArenaType_MetaData] = 0.5;
}

void TestBlobOverwrite() {
using namespace hermes; // NOLINT(*)
Config config = {};
MakeTwoBufferRAMConfig(&config);
std::shared_ptr<Hermes> hermes = hermes::InitHermesDaemon(&config);
SharedMemoryContext *context = &hermes->context_;
DeviceID ram_id = 0;
int slab_index = 0;
std::atomic<u32> *buffers_available =
GetAvailableBuffersArray(context, ram_id);
Assert(buffers_available[slab_index] == 2);

hapi::Context ctx;
ctx.policy = hapi::PlacementPolicy::kRandom;
hapi::Bucket bucket("overwrite", hermes, ctx);

std::string blob_name("1");
size_t blob_size = KILOBYTES(2);
hapi::Blob blob(blob_size, '1');
hapi::Status status = bucket.Put(blob_name, blob, ctx);
Assert(status == 0);

Assert(buffers_available[slab_index] == 1);

// NOTE(chogan): Overwrite the data
hapi::Blob new_blob(blob_size, '2');
status = bucket.Put(blob_name, new_blob, ctx);

Assert(buffers_available[slab_index] == 1);

hermes->Finalize(true);
}

void TestSwap(std::shared_ptr<Hermes> hermes) {
namespace hapi = hermes::api;
hapi::Context ctx;
ctx.policy = hapi::PlacementPolicy::kRandom;
hapi::Bucket bucket(std::string("swap_bucket"), hermes, ctx);
Expand All @@ -94,7 +143,6 @@ void TestSwap(std::shared_ptr<Hermes> hermes) {
}

void TestBufferOrganizer(std::shared_ptr<Hermes> hermes) {
namespace hapi = hermes::api;
hapi::Context ctx;
ctx.policy = hapi::PlacementPolicy::kRandom;
hapi::Bucket bucket(std::string("bo_bucket"), hermes, ctx);
Expand Down Expand Up @@ -195,6 +243,8 @@ int main(int argc, char **argv) {
TestGetBuffers(hermes.get());
TestGetBandwidths(&hermes->context_);
hermes->Finalize(true);

TestBlobOverwrite();
}

if (test_swap) {
Expand Down

0 comments on commit 1b08b62

Please sign in to comment.