Skip to content

Commit

Permalink
Merge db6dda8 into 833a555
Browse files Browse the repository at this point in the history
  • Loading branch information
ChristopherHogan committed Apr 1, 2021
2 parents 833a555 + db6dda8 commit abf63f9
Show file tree
Hide file tree
Showing 10 changed files with 73 additions and 65 deletions.
2 changes: 1 addition & 1 deletion adapter/src/hermes/adapter/stdio/stdio.cc
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ int HERMES_DECL(fclose)(FILE *fp) {
existing.first.st_atim = ts;
existing.first.st_ctim = ts;
mdm->Update(fp, existing.first);
existing.first.st_bkid->Close(ctx);
existing.first.st_bkid->Release(ctx);
}
}
}
Expand Down
11 changes: 9 additions & 2 deletions src/api/bucket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ Bucket::Bucket(const std::string &initial_name,
}
}

Bucket::~Bucket() {
if (IsValid()) {
Context ctx;
Release(ctx);
}
}

bool Bucket::IsValid() const {
return !IsNullBucketId(id_);
}
Expand Down Expand Up @@ -213,11 +220,11 @@ Status Bucket::Persist(const std::string &file_name, Context &ctx) {
return result;
}

Status Bucket::Close(Context &ctx) {
Status Bucket::Release(Context &ctx) {
(void)ctx;
Status ret;

if (IsValid()) {
if (IsValid() && hermes_->is_initialized) {
LOG(INFO) << "Closing bucket '" << name_ << "'" << std::endl;
DecrementRefcount(&hermes_->context_, &hermes_->rpc_, id_);
id_.as_int = 0;
Expand Down
25 changes: 14 additions & 11 deletions src/api/bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,13 @@ class Bucket {
Bucket(const std::string &initial_name, std::shared_ptr<Hermes> const &h,
Context ctx);

~Bucket() {
// TODO(chogan): Should we close implicitly by default?
// Context ctx;
// Close(ctx);

name_.clear();
id_.as_int = 0;
}
/**
* \brief Releases the Bucket, decrementing its reference count
*
* This does not free any resources. To remove the Bucket from the
* MetadataManager and free its stored Blobs, see Bucket::Destroy.
*/
~Bucket();

/** get the name of bucket */
std::string GetName() const {
Expand Down Expand Up @@ -149,9 +148,13 @@ class Bucket {
* The blobs are written in the same order in which they are `Put`. */
Status Persist(const std::string &file_name, Context &ctx);

/** close this bucket and free its associated resources (?) */
/** Invalidates handle */
Status Close(Context &ctx);
/**
* \brief Release this Bucket
*
* This simpley decrements the refcount to this Bucket in the Hermes metadata.
* To free resources associated with this Bucket, call Bucket::Destroy.
*/
Status Release(Context &ctx);

/** destroy this bucket */
/** ctx controls "aggressiveness */
Expand Down
3 changes: 3 additions & 0 deletions src/api/hermes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ void *Hermes::GetAppCommunicator() {
void Hermes::Finalize(bool force_rpc_shutdown) {
hermes::Finalize(&context_, &comm_, &rpc_, shmem_name_.c_str(), &trans_arena_,
IsApplicationCore(), force_rpc_shutdown);
is_initialized = false;
}

void Hermes::RemoteFinalize() {
Expand Down Expand Up @@ -304,6 +305,8 @@ std::shared_ptr<api::Hermes> InitHermes(Config *config, bool is_daemon,
// clients have been initialized.
InitNeighborhoodTargets(&result->context_, &result->rpc_);

result->is_initialized = true;

return result;
}

Expand Down
1 change: 1 addition & 0 deletions src/api/hermes.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class Hermes {
hermes::Arena trans_arena_;
std::string shmem_name_;
std::string rpc_server_name_;
bool is_initialized;

/** if true will do more checks, warnings, expect slower code */
const bool debug_mode_ = true;
Expand Down
1 change: 0 additions & 1 deletion src/api/vbucket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ Blob& VBucket::GetBlob(std::string blob_name, std::string bucket_name) {
size_t blob_size = bkt.Get(blob_name, local_blob, ctx);
local_blob.resize(blob_size);
bkt.Get(blob_name, local_blob, ctx);
bkt.Close(ctx);
return local_blob;
}

Expand Down
82 changes: 38 additions & 44 deletions test/bucket_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ int compress_blob(hermes::api::TraitInput &input, hermes::api::Trait *trait) {
size_t blob_size = bkt.Get(input.blob_name, blob, ctx);
blob.resize(blob_size);
bkt.Get(input.blob_name, blob, ctx);
bkt.Close(ctx);
// If Hermes is already linked with a compression library, you can call the
// function directly here. If not, the symbol will have to be dynamically
// loaded and probably stored as a pointer in the Trait.
Expand Down Expand Up @@ -162,6 +161,43 @@ void TestPutOverwrite(std::shared_ptr<hapi::Hermes> hermes) {
bucket.Destroy(ctx);
}

void TestCompressionTrait(std::shared_ptr<hapi::Hermes> hermes) {
hermes::api::Context ctx;
hermes::api::Status status;

const std::string bucket_name = "compression";
hermes::api::Bucket my_bucket(bucket_name, hermes, ctx);
hermes::api::Blob p1(1024*1024*1, 255);
hermes::api::Blob p2(p1);

const std::string blob1_name = "Blob1";
const std::string blob2_name = "Blob2";
Assert(my_bucket.Put(blob1_name, p1, ctx).Succeeded());
Assert(my_bucket.Put(blob2_name, p2, ctx).Succeeded());

Assert(my_bucket.ContainsBlob(blob1_name));
Assert(my_bucket.ContainsBlob(blob2_name));

const std::string vbucket_name = "VB1";
hermes::api::VBucket my_vb(vbucket_name, hermes, false, ctx);
my_vb.Link(blob1_name, bucket_name, ctx);
my_vb.Link(blob2_name, bucket_name, ctx);

Assert(my_vb.Contain_blob(blob1_name, bucket_name) == 1);
Assert(my_vb.Contain_blob(blob2_name, bucket_name) == 1);

MyTrait trait;
trait.compress_level = 6;
// Compression Trait compresses all blobs on Attach
Assert(my_vb.Attach(&trait, ctx).Succeeded());

Assert(my_vb.Unlink(blob1_name, bucket_name, ctx).Succeeded());
Assert(my_vb.Detach(&trait, ctx).Succeeded());

Assert(my_vb.Delete(ctx).Succeeded());
Assert(my_bucket.Destroy(ctx).Succeeded());
}

int main(int argc, char **argv) {
int mpi_threads_provided;
MPI_Init_thread(NULL, NULL, MPI_THREAD_MULTIPLE, &mpi_threads_provided);
Expand All @@ -178,51 +214,9 @@ int main(int argc, char **argv) {
hermes_app = hermes::api::InitHermes(config_file);

if (hermes_app->IsApplicationCore()) {
hermes::api::Context ctx;
hermes::api::Status status;

hermes::api::Bucket my_bucket("compression", hermes_app, ctx);
hermes_app->Display_bucket();
hermes::api::Blob p1(1024*1024*400, 255);
hermes::api::Blob p2(p1);
status = my_bucket.Put("Blob1", p1, ctx);
Assert(status.Succeeded());
status = my_bucket.Put("Blob2", p2, ctx);
Assert(status.Succeeded());

if (my_bucket.ContainsBlob("Blob1"))
std::cout<< "Found Blob1\n";
else
std::cout<< "Not found Blob1\n";
if (my_bucket.ContainsBlob("Blob2"))
std::cout<< "Found Blob2\n";
else
std::cout<< "Not found Blob2\n";

hermes::api::VBucket my_vb("VB1", hermes_app, false, ctx);
hermes_app->Display_vbucket();
my_vb.Link("Blob1", "compression", ctx);
my_vb.Link("Blob2", "compression", ctx);
if (my_vb.Contain_blob("Blob1", "compression") == 1)
std::cout << "Found Blob1 from compression bucket in VBucket VB1\n";
else
std::cout << "Not found Blob1 from compression bucket in VBucket VB1\n";
if (my_vb.Contain_blob("Blob2", "compression") == 1)
std::cout << "Found Blob2 from compression bucket in VBucket VB1\n";
else
std::cout << "Not found Blob2 from compression bucket in VBucket VB1\n";

// compression level
MyTrait trait;
trait.compress_level = 6;
my_vb.Attach(&trait, ctx); // compress action to data starts

TestCompressionTrait(hermes_app);
TestBucketPersist(hermes_app);
TestPutOverwrite(hermes_app);

///////
my_vb.Unlink("Blob1", "VB1", ctx);
my_vb.Detach(&trait, ctx);
} else {
// Hermes core. No user code here.
}
Expand Down
2 changes: 2 additions & 0 deletions test/buffer_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ static void TestBlobOverwrite() {

Assert(buffers_available[slab_index] == 1);

bucket.Destroy(ctx);

hermes->Finalize(true);
}

Expand Down
4 changes: 2 additions & 2 deletions test/end_to_end_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ void TestBulkTransfer(std::shared_ptr<hapi::Hermes> hermes, int app_rank) {

Assert(get_result == put_data);

bucket.Close(ctx);
bucket.Release(ctx);
}

hermes->AppBarrier();
Expand Down Expand Up @@ -107,7 +107,7 @@ int main(int argc, char **argv) {
TestPutGetBucket(shared_bucket, app_rank, app_size);

if (app_rank != 0) {
shared_bucket.Close(ctx);
shared_bucket.Release(ctx);
}

hermes->AppBarrier();
Expand Down
7 changes: 3 additions & 4 deletions test/mdm_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ static void TestLocalGetNextFreeBucketId(HermesPtr hermes) {
for (u32 i = 0; i < mdm->max_buckets; ++i) {
std::string bucket_name = "bucket" + std::to_string(i);
hapi::Bucket bucket(bucket_name, hermes, ctx);
bucket.Close(ctx);
}

std::string fail_name = "this_should_fail";
Expand All @@ -79,7 +78,7 @@ static void TestGetOrCreateBucketId(HermesPtr hermes) {
std::string bucket_name = "bucket";
hapi::Bucket new_bucket(bucket_name, hermes, ctx);
u64 id = new_bucket.GetId();
new_bucket.Close(ctx);
new_bucket.Release(ctx);

hapi::Bucket existing_bucket(bucket_name, hermes, ctx);
Assert(existing_bucket.GetId() == id);
Expand Down Expand Up @@ -126,7 +125,7 @@ static void TestRenameBucket(HermesPtr hermes) {

std::string new_bucket_name = "new_bucket";
bucket.Rename(new_bucket_name, ctx);
bucket.Close(ctx);
bucket.Release(ctx);

hapi::Bucket renamed_bucket(new_bucket_name, hermes, ctx);
size_t blob_size = renamed_bucket.GetBlobSize(&hermes->trans_arena_,
Expand Down Expand Up @@ -155,7 +154,7 @@ static void TestBucketRefCounting(HermesPtr hermes) {
// Bucket should not have been destroyed
Assert(bucket1.IsValid());

bucket1.Close(ctx);
bucket1.Release(ctx);
// Refcount is 1

bucket2.Destroy(ctx);
Expand Down

0 comments on commit abf63f9

Please sign in to comment.