Skip to content

Commit

Permalink
Merge 8c292d3 into d581dab
Browse files Browse the repository at this point in the history
  • Loading branch information
ChristopherHogan committed Nov 10, 2021
2 parents d581dab + 8c292d3 commit 808db01
Show file tree
Hide file tree
Showing 11 changed files with 65 additions and 14 deletions.
9 changes: 9 additions & 0 deletions adapter/src/hermes/adapter/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@
*/
const char* kHermesConf = "HERMES_CONF";

/**
* Environment variable used to inform Hermes if this run consists of a separate
* client that needs to attach to an existing daemon (1) or if this run is
* co-deployed with a Hermes core (0). This is only relevant for Hermes jobs
* launched with 1 application process, as Hermes jobs run with >1 MPI ranks
* require a daemon. Defaults to 0.
*/
const char* kHermesClient = "HERMES_CLIENT";

/**
* Specifies whether or not Hermes should eagerly and asynchronously flush
* writes to their final destinations.
Expand Down
6 changes: 3 additions & 3 deletions adapter/src/hermes/adapter/posix/metadata_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,12 @@ class MetadataManager {
if (ref == 0) {
this->is_mpi = is_mpi;
char* hermes_config = getenv(kHermesConf);
char* hermes_client = getenv(kHermesClient);
if (this->is_mpi) {
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &comm_size);
// TODO(chogan): Need a better way to distinguish between client and
// daemon. https://github.com/HDFGroup/hermes/issues/206
if (comm_size > 1) {

if ((hermes_client && hermes_client[0] == '1') || comm_size > 1) {
hermes = hermes::InitHermesClient(hermes_config);
} else {
this->is_mpi = false;
Expand Down
16 changes: 12 additions & 4 deletions adapter/src/hermes/adapter/posix/posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,23 @@ int simple_open(int ret, const std::string &path_str, int flags) {
stat.st_atim = ts;
stat.st_mtim = ts;
stat.st_ctim = ts;
if (flags & O_APPEND) {
/* FIXME: get current size of bucket from Hermes*/
stat.st_ptr = stat.st_size;
}

/* FIXME(hari) check if this initialization is correct. */
mdm->InitializeHermes();

bool bucket_exists = mdm->GetHermes()->BucketExists(path_str);
// TODO(hari) how to pass to hermes to make a private bucket
stat.st_bkid =
std::make_shared<hapi::Bucket>(path_str, mdm->GetHermes());

if (bucket_exists) {
stat.st_size = stat.st_bkid->GetTotalBlobSize();
}

if (flags & O_APPEND) {
stat.st_ptr = stat.st_size;
}

mdm->Create(ret, stat);
} else {
// TODO(hari): @error_handling invalid fh.
Expand Down
14 changes: 14 additions & 0 deletions src/api/bucket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,20 @@ Status Bucket::Put(const std::string &name, const u8 *data, size_t size) {
return result;
}

size_t Bucket::GetTotalBlobSize() {
std::vector<BlobID> blob_ids = hermes::GetBlobIds(&hermes_->context_,
&hermes_->rpc_, id_);
api::Context ctx;
size_t result = 0;
for (size_t i = 0; i < blob_ids.size(); ++i) {
ScopedTemporaryMemory scratch(&hermes_->trans_arena_);
result += hermes::GetBlobSizeById(&hermes_->context_, &hermes_->rpc_,
scratch, blob_ids[i]);
}

return result;
}

size_t Bucket::GetBlobSize(Arena *arena, const std::string &name,
const Context &ctx) {
(void)ctx;
Expand Down
3 changes: 3 additions & 0 deletions src/api/bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ class Bucket {
/** Returns true if this Bucket has been created but not yet destroyed */
bool IsValid() const;

/** Returns the total size of all Blobs in this Bucket. */
size_t GetTotalBlobSize();

/** Put a blob in this bucket with context */
template<typename T>
Status Put(const std::string &name, const std::vector<T> &data, Context &ctx);
Expand Down
15 changes: 14 additions & 1 deletion src/api/hermes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,13 @@ bool Hermes::BucketContainsBlob(const std::string &bucket_name,
return result;
}

bool Hermes::BucketExists(const std::string &bucket_name) {
BucketID id = hermes::GetBucketId(&context_, &rpc_, bucket_name.c_str());
bool result = !IsNullBucketId(id);

return result;
}

int Hermes::GetProcessRank() {
int result = comm_.sub_proc_id;

Expand Down Expand Up @@ -238,7 +245,13 @@ BootstrapSharedMemory(Arena *arenas, Config *config, CommunicationContext *comm,

static void InitGlog() {
FLAGS_logtostderr = 1;
FLAGS_minloglevel = 0;
const char kMinLogLevel[] = "GLOG_minloglevel";
char *min_log_level = getenv(kMinLogLevel);

if (!min_log_level) {
FLAGS_minloglevel = 0;
}

FLAGS_v = 0;

google::InitGoogleLogging("hermes");
Expand Down
5 changes: 2 additions & 3 deletions src/api/hermes.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,8 @@ class Hermes {
/** Check if a given bucket contains a blob. */
bool BucketContainsBlob(const std::string &bucket_name,
const std::string &blob_name);

// MPI comms.
// proxy/reference to Hermes core
/** Returns true if @p bucket_name exists in this Hermes instance. */
bool BucketExists(const std::string &bucket_name);
};

class VBucket;
Expand Down
6 changes: 4 additions & 2 deletions src/buffer_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,8 @@ BufferID MakeBufferHeaders(Arena *arena, int buffer_size, u32 start_index,
header->device_id = device_id;

// NOTE(chogan): Stored as offset from base address of shared memory
header->data_offset = buffer_size * j + initial_offset;
header->data_offset =
(ptrdiff_t)buffer_size * (ptrdiff_t)j + initial_offset;

previous->next_free = header->id;
previous = header;
Expand Down Expand Up @@ -1095,7 +1096,8 @@ ptrdiff_t InitBufferPool(u8 *shmem_base, Arena *buffer_pool_arena,
// capacity for buffering (excluding BufferPool metadata).
size_t actual_ram_buffer_capacity = 0;
for (int slab = 0; slab < config->num_slabs[0]; ++slab) {
size_t slab_bytes = buffer_counts[0][slab] * slab_buffer_sizes[0][slab];
size_t slab_bytes =
(size_t)buffer_counts[0][slab] * (size_t)slab_buffer_sizes[0][slab];
actual_ram_buffer_capacity += slab_bytes;
}
config->capacities[0] = actual_ram_buffer_capacity;
Expand Down
2 changes: 1 addition & 1 deletion src/buffer_pool_visualizer/buffer_pool_visualizer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ static int DrawBufferPool(SharedMemoryContext *context,
int num_blocks = header->capacity / block_size;

for (int j = 0; j < num_blocks; ++j) {
std::string index_str = std::to_string(index) + std::to_string(j);
std::string index_str = std::to_string(index) + ":" + std::to_string(j);
auto found = block_refs.find(index_str);
if (found != block_refs.end()) {
block_refs[index_str] += 1;
Expand Down
1 change: 1 addition & 0 deletions src/data_placement_engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,7 @@ Status CalculatePlacement(SharedMemoryContext *context, RpcContext *rpc,
}

if (targets.size() == 0) {
result = DPE_PLACEMENTSCHEMA_EMPTY;
continue;
}

Expand Down
2 changes: 2 additions & 0 deletions src/metadata_storage_stb_ds.cc
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ u32 AllocateEmbeddedIdList(MetadataManager *mdm, u32 length) {
std::vector<BlobID> LocalGetBlobIds(SharedMemoryContext *context,
BucketID bucket_id) {
MetadataManager *mdm = GetMetadataManagerFromContext(context);
BeginTicketMutex(&mdm->bucket_mutex);
BucketInfo *info = LocalGetBucketInfoById(mdm, bucket_id);
u32 num_blobs = info->blobs.length;
std::vector<BlobID> result(num_blobs);
Expand All @@ -475,6 +476,7 @@ std::vector<BlobID> LocalGetBlobIds(SharedMemoryContext *context,
result[i] = blob_ids[i];
}
ReleaseIdsPtr(mdm);
EndTicketMutex(&mdm->bucket_mutex);

return result;
}
Expand Down

0 comments on commit 808db01

Please sign in to comment.