Skip to content

Commit

Permalink
Merge bfef763 into 11ccde3
Browse files Browse the repository at this point in the history
  • Loading branch information
ChristopherHogan committed Mar 18, 2021
2 parents 11ccde3 + bfef763 commit 5bea2ac
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 101 deletions.
47 changes: 10 additions & 37 deletions src/api/bucket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,49 +46,22 @@ bool Bucket::IsValid() const {

Status Bucket::Put(const std::string &name, const u8 *data, size_t size,
Context &ctx) {
Status ret;

if (IsBlobNameTooLong(name)) {
// TODO(chogan): @errorhandling
ret = BLOB_NAME_TOO_LONG;
LOG(ERROR) << ret.Msg();
return ret;
}
Status result;

if (size > 0 && nullptr == data) {
ret = INVALID_BLOB;
LOG(ERROR) << ret.Msg();
return ret;
result = INVALID_BLOB;
LOG(ERROR) << result.Msg();
}

if (IsValid()) {
std::vector<size_t> sizes(1, size);
std::vector<PlacementSchema> schemas;
HERMES_BEGIN_TIMED_BLOCK("CalculatePlacement");
ret = CalculatePlacement(&hermes_->context_, &hermes_->rpc_, sizes, schemas,
ctx);
HERMES_END_TIMED_BLOCK();

if (ret.Succeeded()) {
std::vector<std::string> names(1, name);
std::vector<std::vector<u8>> blobs(1);
blobs[0].resize(size);
// TODO(chogan): Create a PreallocatedMemory allocator for std::vector so
// that a single-blob-Put doesn't perform a copy
memcpy(blobs[0].data(), data, size);
ret = PlaceBlobs(schemas, blobs, names, ctx.buffer_organizer_retries);
} else {
// TODO(chogan): @errorhandling No space left or contraints unsatisfiable.
LOG(ERROR) << ret.Msg();
return ret;
}
} else {
ret = INVALID_BUCKET;
LOG(ERROR) << ret.Msg();
return ret;
if (result.Succeeded()) {
std::vector<std::string> names{name};
// TODO(chogan): Create a PreallocatedMemory allocator for std::vector so
// that a single-blob-Put doesn't perform a copy
std::vector<Blob> blobs{Blob{data, data + size}};
result = Put(names, blobs, ctx);
}

return ret;
return result;
}

size_t Bucket::GetBlobSize(Arena *arena, const std::string &name,
Expand Down
3 changes: 0 additions & 3 deletions src/api/bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,6 @@ Status Bucket::Put(std::vector<std::string> &names,

for (auto &name : names) {
if (IsBlobNameTooLong(name)) {
// TODO(chogan): @errorhandling
ret = BLOB_NAME_TOO_LONG;
LOG(ERROR) << ret.Msg();
return ret;
Expand Down Expand Up @@ -221,12 +220,10 @@ Status Bucket::Put(std::vector<std::string> &names,
if (ret.Succeeded()) {
ret = PlaceBlobs(schemas, blobs, names, ctx.buffer_organizer_retries);
} else {
// TODO(chogan): @errorhandling No space left or contraints unsatisfiable.
LOG(ERROR) << ret.Msg();
return ret;
}
} else {
// TODO(chogan): @errorhandling
ret = INVALID_BUCKET;
LOG(ERROR) << ret.Msg();
return ret;
Expand Down
84 changes: 56 additions & 28 deletions src/data_placement_engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -361,45 +361,73 @@ PlacementSchema AggregateBlobSchema(PlacementSchema &schema) {
return result;
}

enum Topology {
Topology_Local,
Topology_Neighborhood,
Topology_Global,

Topology_Count
};

Status CalculatePlacement(SharedMemoryContext *context, RpcContext *rpc,
std::vector<size_t> &blob_sizes,
std::vector<PlacementSchema> &output,
const api::Context &api_context) {
(void)rpc;
std::vector<PlacementSchema> output_tmp;
Status result;

// TODO(chogan): For now we just look at the node level targets as the default
// path. Eventually we will need the ability to escalate to neighborhoods, and
// the entire cluster.

// TODO(chogan): @optimization We can avoid the copy here when getting local
// targets by just getting a pointer and length. I went with a vector just to
// make the interface nicer when we need neighborhood or global targets.
std::vector<TargetID> targets = LocalGetNodeTargets(context);
std::vector<u64> node_state = GetRemainingNodeCapacities(context, targets);

switch (api_context.policy) {
// TODO(KIMMY): check device capacity against blob size
case api::PlacementPolicy::kRandom: {
std::multimap<u64, TargetID> ordered_cap;
for (size_t i = 0; i < node_state.size(); ++i) {
ordered_cap.insert(std::pair<u64, TargetID>(node_state[i], targets[i]));
// NOTE(chogan): Start with local targets and gradually expand the target list
// until the placement succeeds
for (int i = 0; i < Topology_Count; ++i) {
std::vector<TargetID> targets;

switch (i) {
case Topology_Local: {
// TODO(chogan): @optimization We can avoid the copy here when getting
// local targets by just getting a pointer and length.
targets = LocalGetNodeTargets(context);
break;
}
case Topology_Neighborhood: {
targets = GetNeighborhoodTargets(context, rpc);
break;
}
case Topology_Global: {
// TODO(chogan): GetGlobalTargets(context, rpc);
break;
}

result = RandomPlacement(blob_sizes, ordered_cap, output_tmp);
break;
}
case api::PlacementPolicy::kRoundRobin: {
result = RoundRobinPlacement(blob_sizes, node_state,
output_tmp, targets);
break;

std::vector<u64> node_state =
GetRemainingTargetCapacities(context, rpc, targets);

switch (api_context.policy) {
// TODO(KIMMY): check device capacity against blob size
case api::PlacementPolicy::kRandom: {
std::multimap<u64, TargetID> ordered_cap;
for (size_t i = 0; i < node_state.size(); ++i) {
ordered_cap.insert(std::pair<u64, TargetID>(node_state[i],
targets[i]));
}

result = RandomPlacement(blob_sizes, ordered_cap, output_tmp);
break;
}
case api::PlacementPolicy::kRoundRobin: {
result = RoundRobinPlacement(blob_sizes, node_state,
output_tmp, targets);
break;
}
case api::PlacementPolicy::kMinimizeIoTime: {
std::vector<f32> bandwidths = GetBandwidths(context);

result = MinimizeIoTimePlacement(blob_sizes, node_state, bandwidths,
targets, output_tmp);
break;
}
}
case api::PlacementPolicy::kMinimizeIoTime: {
std::vector<f32> bandwidths = GetBandwidths(context);

result = MinimizeIoTimePlacement(blob_sizes, node_state, bandwidths,
targets, output_tmp);
if (!result.Failed()) {
break;
}
}
Expand Down
44 changes: 29 additions & 15 deletions src/metadata_management.cc
Original file line number Diff line number Diff line change
Expand Up @@ -807,27 +807,13 @@ void DecrementRefcount(SharedMemoryContext *context, RpcContext *rpc,
}
}

u64 LocalGetRemainingCapacity(SharedMemoryContext *context, TargetID id) {
u64 LocalGetRemainingTargetCapacity(SharedMemoryContext *context, TargetID id) {
Target *target = GetTargetFromId(context, id);
u64 result = target->remaining_space.load();

return result;
}

u64 GetRemainingCapacity(SharedMemoryContext *context, RpcContext *rpc,
TargetID id) {
u32 target_node = id.bits.node_id;

u64 result = 0;
if (target_node == rpc->node_id) {
result = LocalGetRemainingCapacity(context, id);
} else {
result = RpcCall<u64>(rpc, target_node, "RemoteGetRemainingCapacity", id);
}

return result;
}

SystemViewState *GetLocalSystemViewState(MetadataManager *mdm) {
SystemViewState *result =
(SystemViewState *)((u8 *)mdm + mdm->system_view_state_offset);
Expand Down Expand Up @@ -1185,4 +1171,32 @@ std::vector<TargetID> GetNeighborhoodTargets(SharedMemoryContext *context,

return result;
}

u64 GetRemainingTargetCapacity(SharedMemoryContext *context, RpcContext *rpc,
TargetID target_id) {
u64 result = 0;
u32 target_node = target_id.bits.node_id;

if (target_node == rpc->node_id) {
result = LocalGetRemainingTargetCapacity(context, target_id);
} else {
result = RpcCall<u64>(rpc, target_node, "RemoteGetRemainingTargetCapacity",
target_id);
}

return result;
}

std::vector<u64>
GetRemainingTargetCapacities(SharedMemoryContext *context, RpcContext *rpc,
const std::vector<TargetID> &targets) {
std::vector<u64> result(targets.size());

for (size_t i = 0; i < targets.size(); ++i) {
result[i] = GetRemainingTargetCapacity(context, rpc, targets[i]);
}

return result;
}

} // namespace hermes
11 changes: 11 additions & 0 deletions src/metadata_management.h
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,17 @@ bool IsVBucketNameTooLong(const std::string &name);
TargetID FindTargetIdFromDeviceId(const std::vector<TargetID> &targets,
DeviceID device_id);

/**
*
*/
std::vector<TargetID> GetNeighborhoodTargets(SharedMemoryContext *context,
RpcContext *rpc);
/**
*
*/
std::vector<u64>
GetRemainingTargetCapacities(SharedMemoryContext *context, RpcContext *rpc,
const std::vector<TargetID> &targets);
/**
*
*/
Expand Down
5 changes: 1 addition & 4 deletions src/metadata_management_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ u64 LocalGet(MetadataManager *mdm, const char *key, MapType map_type);
void LocalPut(MetadataManager *mdm, const char *key, u64 val, MapType map_type);
void LocalDelete(MetadataManager *mdm, const char *key, MapType map_type);

u64 LocalGetRemainingCapacity(SharedMemoryContext *context, TargetID id);
u64 LocalGetRemainingTargetCapacity(SharedMemoryContext *context, TargetID id);
void LocalUpdateGlobalSystemViewState(SharedMemoryContext *context,
std::vector<i64> adjustments);
SystemViewState *GetLocalSystemViewState(SharedMemoryContext *context);
Expand All @@ -90,9 +90,6 @@ void StartGlobalSystemViewStateUpdateThread(SharedMemoryContext *context,
void InitMetadataStorage(SharedMemoryContext *context, MetadataManager *mdm,
Arena *arena, Config *config);

std::vector<u64>
GetRemainingNodeCapacities(SharedMemoryContext *context,
const std::vector<TargetID> &targets);
std::string GetSwapFilename(MetadataManager *mdm, u32 node_id);
std::vector<BlobID> LocalGetBlobIds(SharedMemoryContext *context,
BucketID bucket_id);
Expand Down
12 changes: 0 additions & 12 deletions src/metadata_storage_stb_ds.cc
Original file line number Diff line number Diff line change
Expand Up @@ -583,18 +583,6 @@ size_t GetStoredMapSize(MetadataManager *mdm, MapType map_type) {
return result;
}

std::vector<u64>
GetRemainingNodeCapacities(SharedMemoryContext *context,
const std::vector<TargetID> &targets) {
std::vector<u64> result(targets.size());

for (size_t i = 0; i < targets.size(); ++i) {
result[i] = LocalGetRemainingCapacity(context, targets[i]);
}

return result;
}

u32 HashStringForStorage(MetadataManager *mdm, RpcContext *rpc,
const char *str) {
int result =
Expand Down
5 changes: 3 additions & 2 deletions src/rpc_thallium.cc
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ void ThalliumStartRpcServer(SharedMemoryContext *context, RpcContext *rpc,
};

auto rpc_get_remaining_capacity = [context](const request &req, TargetID id) {
u64 result = LocalGetRemainingCapacity(context, id);
u64 result = LocalGetRemainingTargetCapacity(context, id);

req.respond(result);
};
Expand Down Expand Up @@ -371,7 +371,8 @@ void ThalliumStartRpcServer(SharedMemoryContext *context, RpcContext *rpc,
rpc_server->define("RemoteDecrementRefcount", rpc_decrement_refcount_bucket);
rpc_server->define("RemoteDecrementRefcountVBucket",
rpc_decrement_refcount_vbucket);
rpc_server->define("RemoteGetRemainingCapacity", rpc_get_remaining_capacity);
rpc_server->define("RemoteGetRemainingTargetCapacity",
rpc_get_remaining_capacity);
rpc_server->define("RemoteUpdateGlobalSystemViewState",
rpc_update_global_system_view_state);
rpc_server->define("RemoteGetGlobalDeviceCapacities",
Expand Down

0 comments on commit 5bea2ac

Please sign in to comment.