Skip to content

Commit

Permalink
Basics of BO organize function
Browse files Browse the repository at this point in the history
  • Loading branch information
ChristopherHogan committed Sep 17, 2021
1 parent 5c738c6 commit 8ddadef
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 13 deletions.
84 changes: 80 additions & 4 deletions src/buffer_organizer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,89 @@ namespace hermes {
BufferOrganizer::BufferOrganizer(int num_threads) : pool(num_threads) {
}

void LocalOrganize(const std::string &blob_name, double epsilon) {
(void)blob_name; (void)epsilon;
BufferInfo LocalGetBufferInfo(SharedMemoryContext *context,
BufferID buffer_id) {
(void)context;
(void)buffer_id;
BufferInfo result = {};

return result;
}

BufferInfo GetBufferInfo(SharedMemoryContext *context, RpcContext *rpc,
BufferID buffer_id) {
BufferInfo result = {};
u32 target_node = buffer_id.bits.node_id;

if (target_node == rpc->node_id) {
result = LocalGetBufferInfo(context, buffer_id);
} else {
result = RpcCall<BufferInfo>(rpc, target_node, "RemoteGetBufferInfo",
buffer_id);
}

return result;
}

f32 ComputeBlobAccessScore(SharedMemoryContext *context, RpcContext *rpc,
const std::vector<BufferID> &buffer_ids) {
f32 result = 0;

// TODO(chogan): Sync or async?
std::vector<BufferInfo> buffer_info(buffer_ids.size());

for (size_t i = 0; i < buffer_ids.size(); ++i) {
BufferID id = buffer_ids[i];
buffer_info[i] = GetBufferInfo(context, rpc, id);
}

// Get biggest buffer on slowest tier
return result;
}

void LocalOrganizeBlob(SharedMemoryContext *context, RpcContext *rpc,
const std::string &internal_blob_name, double epsilon) {
MetadataManager *mdm = GetMetadataManagerFromContext(context);
BlobID blob_id = {};
blob_id.as_int = LocalGet(mdm, internal_blob_name.c_str(), kMapType_BlobId);

// node: internal_blob_name
f32 importance_score = LocalGetBlobImportanceScore(context, blob_id);

// node: internal_blob_name
std::vector<BufferID> buffer_ids = LocalGetBufferIdList(mdm, blob_id);

// node: internal_blob_name
// node: each buffer's buffer_id
f32 access_score = ComputeBlobAccessScore(context, rpc, buffer_ids);


// Get buffers, sorted by virtual device, then size (or bw?)
// - sizes
// - Virtual devices (bandwidths)

for (size_t i = 0; i < buffer_ids.size(); ++i) {
// check new score
// Move if we don't go past epsilon in the opposte direction
if (std::abs(importance_score - access_score) < epsilon) {
break;
}
}

// TODO(chogan): Moves will be async. Need to wait? No.
}

void OrganizeBlob(SharedMemoryContext *context, RpcContext *rpc,
BucketID bucket_id, const std::string &blob_name,
double epsilon) {
MetadataManager *mdm = GetMetadataManagerFromContext(context);
std::string internal_name = MakeInternalBlobName(blob_name, bucket_id);
u32 target_node = HashString(mdm, rpc, internal_name.c_str());

if (target_node == rpc->node_id) {
LocalOrganizeBlob(context, rpc, internal_name, epsilon);
} else {
RpcCall<void>(rpc, target_node, "RemoteOrganizeBlob", internal_name,
epsilon);
}
}

void LocalShutdownBufferOrganizer(SharedMemoryContext *context) {
Expand Down
6 changes: 6 additions & 0 deletions src/buffer_organizer.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ struct BoTask {
BoArgs args;
};

struct BufferInfo {
BufferID id;
f32 bandwidth_mbps;
size_t size;
};

struct BufferOrganizer {
ThreadPool pool;

Expand Down
2 changes: 1 addition & 1 deletion src/metadata_management_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ BucketID LocalGetOrCreateBucketId(SharedMemoryContext *context,
const std::string &name);
VBucketID LocalGetOrCreateVBucketId(SharedMemoryContext *context,
const std::string &name);
f32 LocalGetBlobScore(SharedMemoryContext *context, BlobID blob_id);
f32 LocalGetBlobImportanceScore(SharedMemoryContext *context, BlobID blob_id);

/**
* Faster version of std::stoull.
Expand Down
11 changes: 6 additions & 5 deletions src/metadata_storage_stb_ds.cc
Original file line number Diff line number Diff line change
Expand Up @@ -958,7 +958,7 @@ void LocalRemoveBlobFromVBucketInfo(SharedMemoryContext *context,
EndTicketMutex(&mdm->vbucket_mutex);
}

f32 LocalGetBlobScore(SharedMemoryContext *context, BlobID blob_id) {
f32 LocalGetBlobImportanceScore(SharedMemoryContext *context, BlobID blob_id) {
MetadataManager *mdm = GetMetadataManagerFromContext(context);
Stats stats = LocalGetBlobStats(context, blob_id);

Expand All @@ -967,14 +967,15 @@ f32 LocalGetBlobScore(SharedMemoryContext *context, BlobID blob_id) {
return result;
}

f32 GetBlobScore(SharedMemoryContext *context, RpcContext *rpc,
BlobID blob_id) {
f32 GetBlobImportanceScore(SharedMemoryContext *context, RpcContext *rpc,
BlobID blob_id) {
f32 result = 0;
u32 target_node = GetBlobNodeId(blob_id);
if (target_node == rpc->node_id) {
result = LocalGetBlobScore(context, blob_id);
result = LocalGetBlobImportanceScore(context, blob_id);
} else {
result = RpcCall<f32>(rpc, target_node, "RemoteGetBlobScore", blob_id);
result = RpcCall<f32>(rpc, target_node, "RemoteGetBlobImportanceScore",
blob_id);
}

return result;
Expand Down
7 changes: 4 additions & 3 deletions src/rpc_thallium.cc
Original file line number Diff line number Diff line change
Expand Up @@ -373,9 +373,9 @@ void ThalliumStartRpcServer(SharedMemoryContext *context, RpcContext *rpc,
req.respond(true);
};

auto rpc_get_blob_score =
auto rpc_get_blob_importance_score =
[context](const request &req, BlobID blob_id) {
f32 result = LocalGetBlobScore(context, blob_id);
f32 result = LocalGetBlobImportanceScore(context, blob_id);

req.respond(result);
};
Expand Down Expand Up @@ -476,7 +476,8 @@ void ThalliumStartRpcServer(SharedMemoryContext *context, RpcContext *rpc,
rpc_server->define("RemoteGetBucketNameById",
rpc_get_bucket_name_by_id);
rpc_server->define("RemoteIncrementBlobStats", rpc_increment_blob_stats);
rpc_server->define("RemoteGetBlobScore", rpc_get_blob_score);
rpc_server->define("RemoteGetBlobImportanceScore",
rpc_get_blob_importance_score);
rpc_server->define("RemoteIncrementFlushCount", rpc_increment_flush_count);
rpc_server->define("RemoteDecrementFlushCount", rpc_decrement_flush_count);
rpc_server->define("RemoteGetNumOutstandingFlushingTasks",
Expand Down
7 changes: 7 additions & 0 deletions src/rpc_thallium.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,13 @@ void serialize(A &ar, SwapBlob &swap_blob) {
ar & swap_blob.bucket_id;
}

template<typename A>
void serialize(A &ar, BufferInfo &info) {
ar & info.id;
ar & info.bandwidth_mbps;
ar & info.size;
}

#ifndef THALLIUM_USE_CEREAL
/**
* Lets Thallium know how to serialize a MapType.
Expand Down

0 comments on commit 8ddadef

Please sign in to comment.