Skip to content

Commit

Permalink
Factor GetBufferInfo out of ComputeBlobAccessScore and recompute scor…
Browse files Browse the repository at this point in the history
…e on each iteration
  • Loading branch information
ChristopherHogan committed Sep 22, 2021
1 parent b1739de commit 867f0f9
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 46 deletions.
104 changes: 59 additions & 45 deletions src/buffer_organizer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,57 +73,59 @@ static inline f32 BytesToMegabytes(size_t bytes) {
return result;
}

f32 ComputeBlobAccessScore(SharedMemoryContext *context, RpcContext *rpc,
const std::vector<BufferID> &buffer_ids,
std::vector<BufferInfo> &buffer_info,
f32 &total_blob_size_mb) {
f32 result = 0;
total_blob_size_mb = 0;
size_t num_buffers = buffer_ids.size();
std::vector<BufferInfo> GetBufferInfo(SharedMemoryContext *context,
RpcContext *rpc,
const std::vector<BufferID> &buffer_ids) {
std::vector<BufferInfo> result(buffer_ids.size());

if (buffer_info.size() != num_buffers) {
LOG(ERROR) << "Couldn't compute Blob access score. "
<< "Expected buffer_ids.size() to equal buffer_info.size(), but "
<< num_buffers << " != " << buffer_info.size() << "\n";
} else {
f32 raw_score = 0;
for (size_t i = 0; i < num_buffers; ++i) {
BufferID id = buffer_ids[i];
buffer_info[i] = GetBufferInfo(context, rpc, id);
for (size_t i = 0; i < buffer_ids.size(); ++i) {
result[i] = GetBufferInfo(context, rpc, buffer_ids[i]);
}

f32 size_in_mb = BytesToMegabytes(buffer_info[i].size);
f32 seconds_per_mb = 1.0f / buffer_info[i].bandwidth_mbps;
f32 total_seconds = size_in_mb * seconds_per_mb;
return result;
}

total_blob_size_mb += size_in_mb;
raw_score += total_seconds;
}
result = NormalizeAccessScore(context, raw_score, total_blob_size_mb);
f32 ComputeBlobAccessScore(SharedMemoryContext *context,
const std::vector<BufferInfo> &buffer_info) {
f32 result = 0;
f32 raw_score = 0;
f32 total_blob_size_mb = 0;

for (size_t i = 0; i < buffer_info.size(); ++i) {
f32 size_in_mb = BytesToMegabytes(buffer_info[i].size);
f32 seconds_per_mb = 1.0f / buffer_info[i].bandwidth_mbps;
f32 total_seconds = size_in_mb * seconds_per_mb;

total_blob_size_mb += size_in_mb;
raw_score += total_seconds;
}
result = NormalizeAccessScore(context, raw_score, total_blob_size_mb);

return result;
}

void LocalOrganizeBlob(SharedMemoryContext *context, RpcContext *rpc,
const std::string &internal_blob_name, double epsilon) {
const std::string &internal_blob_name, double epsilon,
f32 explicit_importance_score) {
MetadataManager *mdm = GetMetadataManagerFromContext(context);
BlobID blob_id = {};
blob_id.as_int = LocalGet(mdm, internal_blob_name.c_str(), kMapType_BlobId);

f32 importance_score = LocalGetBlobImportanceScore(context, blob_id);
f32 importance_score = explicit_importance_score;
if (explicit_importance_score != -1) {
importance_score = LocalGetBlobImportanceScore(context, blob_id);
}

std::vector<BufferID> buffer_ids = LocalGetBufferIdList(mdm, blob_id);

std::vector<BufferInfo> buffer_info(buffer_ids.size());
f32 total_blob_size_mb = 0;
f32 access_score = ComputeBlobAccessScore(context, rpc, buffer_ids,
buffer_info, total_blob_size_mb);
std::vector<BufferInfo> buffer_info = GetBufferInfo(context, rpc, buffer_ids);
f32 access_score = ComputeBlobAccessScore(context, buffer_info);

bool increasing_access_score = importance_score > access_score;
// f32 score_difference = std::abs(importance_score - access_score);

auto buffer_info_comparator =
[](const BufferInfo &lhs, const BufferInfo &rhs) {
// Sort first by bandwidth, then by size
return (std::tie(lhs.bandwidth_mbps, lhs.size) <
std::tie(rhs.bandwidth_mbps, rhs.size));
};
Expand All @@ -134,22 +136,33 @@ void LocalOrganizeBlob(SharedMemoryContext *context, RpcContext *rpc,
std::sort(buffer_info.rbegin(), buffer_info.rend(), buffer_info_comparator);
}

std::vector<TargetID> targets = LocalGetNodeTargets(context);

for (size_t i = 0; i < buffer_info.size(); ++i) {
// TODO(chogan): Get systemviewstate
// TODO(chogan): GetBuffers
// TODO(chogan): Calculate score based on new buffers, not existing
f32 buffer_size_mb = BytesToMegabytes(buffer_info[i].size);
f32 buffer_access_seconds = buffer_size_mb * buffer_info[i].bandwidth_mbps;
f32 buffer_contribution = buffer_size_mb / total_blob_size_mb;
f32 adjusted_access_seconds = buffer_access_seconds * buffer_contribution;
f32 buffer_access_score =
NormalizeAccessScore(context, adjusted_access_seconds, buffer_size_mb);

if (!increasing_access_score) {
buffer_access_score *= -1;
std::vector<u64> capacities = GetRemainingTargetCapacities(context, rpc,
targets);

std::vector<BufferID> src;
src.push_back(buffer_info[i].id);

PlacementSchema schema;

if (increasing_access_score) {
// TODO(chogan): possibly need to split buffer into smaller
} else {
// TODO(chogan): possibly need to merge buffers into larger
}

std::vector<BufferID> dest = GetBuffers(context, schema);

if (dest.size()) {
} else {
}

f32 new_access_score = access_score + buffer_access_score;
// TODO(chogan): Create new_blob_info
std::vector<BufferInfo> new_blob_info;
f32 new_access_score = ComputeBlobAccessScore(context, new_blob_info);

bool move_is_valid = true;
// Make sure we didn't move too far past the target
if (increasing_access_score) {
Expand All @@ -165,6 +178,7 @@ void LocalOrganizeBlob(SharedMemoryContext *context, RpcContext *rpc,
}

if (move_is_valid) {
// TODO(chogan): Create schema in loop but only enqueue once?
// TODO(chogan): EnqueueBOTask();
}

Expand All @@ -176,13 +190,13 @@ void LocalOrganizeBlob(SharedMemoryContext *context, RpcContext *rpc,

void OrganizeBlob(SharedMemoryContext *context, RpcContext *rpc,
BucketID bucket_id, const std::string &blob_name,
double epsilon) {
double epsilon, f32 importance_score) {
MetadataManager *mdm = GetMetadataManagerFromContext(context);
std::string internal_name = MakeInternalBlobName(blob_name, bucket_id);
u32 target_node = HashString(mdm, rpc, internal_name.c_str());

if (target_node == rpc->node_id) {
LocalOrganizeBlob(context, rpc, internal_name, epsilon);
LocalOrganizeBlob(context, rpc, internal_name, epsilon, importance_score);
} else {
RpcCall<void>(rpc, target_node, "RemoteOrganizeBlob", internal_name,
epsilon);
Expand Down
8 changes: 7 additions & 1 deletion src/buffer_organizer.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,13 @@ void LocalDecrementFlushCount(SharedMemoryContext *context,
void AwaitAsyncFlushingTasks(SharedMemoryContext *context, RpcContext *rpc,
VBucketID id);

void LocalOrganize(const std::string &blob_name, double epsilon);
void LocalOrganizeBlob(SharedMemoryContext *context, RpcContext *rpc,
const std::string &internal_blob_name, double epsilon,
f32 explicit_importance_score);

void OrganizeBlob(SharedMemoryContext *context, RpcContext *rpc,
BucketID bucket_id, const std::string &blob_name,
double epsilon, f32 importance_score = -1);
} // namespace hermes

#endif // HERMES_BUFFER_ORGANIZER_H_

0 comments on commit 867f0f9

Please sign in to comment.