Skip to content

Commit

Permalink
Merge e8b4c0d into 9730d23
Browse files Browse the repository at this point in the history
  • Loading branch information
ChristopherHogan committed Apr 2, 2021
2 parents 9730d23 + e8b4c0d commit bf3d284
Show file tree
Hide file tree
Showing 17 changed files with 155 additions and 70 deletions.
10 changes: 7 additions & 3 deletions src/api/bucket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ namespace api {

Bucket::Bucket(const std::string &initial_name,
const std::shared_ptr<Hermes> &h, Context ctx)
: name_(initial_name), hermes_(h) {
(void)ctx;

: name_(initial_name), hermes_(h), ctx_(ctx) {
if (IsBucketNameTooLong(name_)) {
id_.as_int = 0;
throw std::length_error("Bucket name is too long: " +
Expand Down Expand Up @@ -64,6 +62,12 @@ Status Bucket::Put(const std::string &name, const u8 *data, size_t size,
return result;
}

Status Bucket::Put(const std::string &name, const u8 *data, size_t size) {
Status result = Put(name, data, size, ctx_);

return result;
}

size_t Bucket::GetBlobSize(Arena *arena, const std::string &name,
Context &ctx) {
(void)ctx;
Expand Down
49 changes: 42 additions & 7 deletions src/api/bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class Bucket {
public:
/** internal Hermes object owned by Bucket */
std::shared_ptr<Hermes> hermes_;
/** This Bucket's Context. */
Context ctx_;

// TODO(chogan): Think about the Big Three
Bucket() : name_(""), id_{0, 0}, hermes_(nullptr) {
Expand Down Expand Up @@ -72,6 +74,12 @@ class Bucket {
template<typename T>
Status Put(const std::string &name, const std::vector<T> &data, Context &ctx);

/**
*
*/
template<typename T>
Status Put(const std::string &name, const std::vector<T> &data);

/**
* \brief Puts a blob to a bucket
*
Expand All @@ -94,10 +102,7 @@ class Bucket {
/**
*
*/
template<typename T>
Status PlaceBlobs(std::vector<PlacementSchema> &schemas,
const std::vector<std::vector<T>> &blobs,
const std::vector<std::string> &names, int retries);
Status Put(const std::string &name, const u8 *data, size_t size);

/**
*
Expand All @@ -106,6 +111,21 @@ class Bucket {
Status Put(std::vector<std::string> &names,
std::vector<std::vector<T>> &blobs, Context &ctx);

/**
*
*/
template<typename T>
Status Put(std::vector<std::string> &names,
std::vector<std::vector<T>> &blobs);

/**
*
*/
template<typename T>
Status PlaceBlobs(std::vector<PlacementSchema> &schemas,
const std::vector<std::vector<T>> &blobs,
const std::vector<std::string> &names, Context &ctx);

/** Get the size in bytes of the Blob referred to by `name` */
size_t GetBlobSize(Arena *arena, const std::string &name, Context &ctx);

Expand Down Expand Up @@ -166,10 +186,17 @@ Status Bucket::Put(const std::string &name, const std::vector<T> &data,
return result;
}

template<typename T>
Status Bucket::Put(const std::string &name, const std::vector<T> &data) {
Status result = Put(name, data, ctx_);

return result;
}

template<typename T>
Status Bucket::PlaceBlobs(std::vector<PlacementSchema> &schemas,
const std::vector<std::vector<T>> &blobs,
const std::vector<std::string> &names, int retries) {
const std::vector<std::string> &names, Context &ctx) {
Status result;

for (size_t i = 0; i < schemas.size(); ++i) {
Expand All @@ -180,12 +207,20 @@ Status Bucket::PlaceBlobs(std::vector<PlacementSchema> &schemas,
LOG(INFO) << "Attaching blob '" << names[i] << "' to Bucket '" << name_
<< "'" << std::endl;
result = PlaceBlob(&hermes_->context_, &hermes_->rpc_, schema, blob,
names[i], id_, retries);
names[i], id_, ctx);
}

return result;
}

template<typename T>
Status Bucket::Put(std::vector<std::string> &names,
std::vector<std::vector<T>> &blobs) {
Status result = Put(names, blobs, ctx_);

return result;
}

template<typename T>
Status Bucket::Put(std::vector<std::string> &names,
std::vector<std::vector<T>> &blobs, Context &ctx) {
Expand Down Expand Up @@ -218,7 +253,7 @@ Status Bucket::Put(std::vector<std::string> &names,
HERMES_END_TIMED_BLOCK();

if (ret.Succeeded()) {
ret = PlaceBlobs(schemas, blobs, names, ctx.buffer_organizer_retries);
ret = PlaceBlobs(schemas, blobs, names, ctx);
} else {
LOG(ERROR) << ret.Msg();
return ret;
Expand Down
2 changes: 2 additions & 0 deletions src/api/hermes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ namespace hermes {
namespace api {

int Context::default_buffer_organizer_retries;
PlacementPolicy Context::default_placement_policy;

Status RenameBucket(const std::string &old_name,
const std::string &new_name,
Expand Down Expand Up @@ -297,6 +298,7 @@ std::shared_ptr<api::Hermes> InitHermes(Config *config, bool is_daemon,

api::Context::default_buffer_organizer_retries =
config->num_buffer_organizer_retries;
api::Context::default_placement_policy = config->default_placement_policy;

InitRpcClients(&result->rpc_);

Expand Down
22 changes: 0 additions & 22 deletions src/api/hermes.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,28 +83,6 @@ class VBucket;

class Bucket;

enum class PlacementPolicy {
kRandom,
kRoundRobin,
kMinimizeIoTime,
};

struct Context {
static int default_buffer_organizer_retries;

PlacementPolicy policy;
int buffer_organizer_retries;

Context() : policy(PlacementPolicy::kRoundRobin),
buffer_organizer_retries(default_buffer_organizer_retries) {}
};

struct TraitTag{};
typedef ID<TraitTag, int64_t, -1> THnd;

struct BucketTag{};
typedef ID<BucketTag, int64_t, -1> BHnd;

/** rename a bucket referred to by name only */
Status RenameBucket(const std::string &old_name,
const std::string &new_name,
Expand Down
6 changes: 3 additions & 3 deletions src/buffer_organizer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
namespace hermes {

int PlaceInHierarchy(SharedMemoryContext *context, RpcContext *rpc,
SwapBlob swap_blob, const std::string &name) {
SwapBlob swap_blob, const std::string &name,
api::Context &ctx) {
int result = 0;
std::vector<PlacementSchema> schemas;
std::vector<size_t> sizes(1, swap_blob.size);
api::Context ctx;
Status ret = CalculatePlacement(context, rpc, sizes, schemas, ctx);

if (ret.Succeeded()) {
Expand All @@ -35,7 +35,7 @@ int PlaceInHierarchy(SharedMemoryContext *context, RpcContext *rpc,
}

ret = PlaceBlob(context, rpc, schemas[0], blob, name.c_str(),
swap_blob.bucket_id, 0, true);
swap_blob.bucket_id, ctx, true);
} else {
// TODO(chogan): @errorhandling
result = 1;
Expand Down
10 changes: 4 additions & 6 deletions src/buffer_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1645,9 +1645,9 @@ size_t ReadFromSwap(SharedMemoryContext *context, Blob blob,
}

api::Status PlaceBlob(SharedMemoryContext *context, RpcContext *rpc,
PlacementSchema &schema, Blob blob, const std::string &name,
BucketID bucket_id, int retries,
bool called_from_buffer_organizer) {
PlacementSchema &schema, Blob blob,
const std::string &name, BucketID bucket_id,
api::Context &ctx, bool called_from_buffer_organizer) {
api::Status result;

if (ContainsBlob(context, rpc, bucket_id, name)
Expand All @@ -1671,16 +1671,14 @@ api::Status PlaceBlob(SharedMemoryContext *context, RpcContext *rpc,
AttachBlobToBucket(context, rpc, name.c_str(), bucket_id, buffer_ids);
} else {
if (called_from_buffer_organizer) {
// TODO(chogan): @errorhandling The BufferOrganizer failed to place a blob
// from swap space into the hierarchy.
result = PLACE_SWAP_BLOB_TO_BUF_FAILED;
LOG(ERROR) << result.Msg();
} else {
SwapBlob swap_blob = PutToSwap(context, rpc, name, bucket_id, blob.data,
blob.size);
result = BLOB_IN_SWAP_PLACE;
LOG(WARNING) << result.Msg();
TriggerBufferOrganizer(rpc, kPlaceInHierarchy, name, swap_blob, retries);
TriggerBufferOrganizer(rpc, kPlaceInHierarchy, name, swap_blob, ctx);
}
}

Expand Down
6 changes: 4 additions & 2 deletions src/buffer_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -485,10 +485,12 @@ std::vector<f32> GetBandwidths(SharedMemoryContext *context);
u32 GetBufferSize(SharedMemoryContext *context, RpcContext *rpc, BufferID id);
bool BufferIsByteAddressable(SharedMemoryContext *context, BufferID id);
int PlaceInHierarchy(SharedMemoryContext *context, RpcContext *rpc,
SwapBlob swap_blob, const std::string &blob_name);
SwapBlob swap_blob, const std::string &blob_name,
api::Context &ctx);
api::Status PlaceBlob(SharedMemoryContext *context, RpcContext *rpc,
PlacementSchema &schema, Blob blob,
const std::string &name, BucketID bucket_id, int retries,
const std::string &name, BucketID bucket_id,
api::Context &ctx,
bool called_from_buffer_organizer = false);
api::Status StdIoPersistBucket(SharedMemoryContext *context, RpcContext *rpc,
Arena *arena, BucketID bucket_id,
Expand Down
18 changes: 18 additions & 0 deletions src/config_parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ enum ConfigVariable {
ConfigVariable_BufferOrganizerPort,
ConfigVariable_RpcHostNumberRange,
ConfigVariable_RpcNumThreads,
ConfigVariable_PlacementPolicy,

ConfigVariable_Count
};
Expand Down Expand Up @@ -115,6 +116,7 @@ static const char *kConfigVariableStrings[ConfigVariable_Count] = {
"buffer_organizer_port",
"rpc_host_number_range",
"rpc_num_threads",
"default_placement_policy",
};

struct Token {
Expand Down Expand Up @@ -817,6 +819,22 @@ void ParseTokens(TokenList *tokens, Config *config) {
config->rpc_num_threads = ParseInt(&tok);
break;
}
case ConfigVariable_PlacementPolicy: {
std::string policy = ParseString(&tok);

if (policy == "MinimizeIoTime") {
config->default_placement_policy =
api::PlacementPolicy::kMinimizeIoTime;
} else if (policy == "Random") {
config->default_placement_policy = api::PlacementPolicy::kRandom;
} else if (policy == "RoundRobin") {
config->default_placement_policy = api::PlacementPolicy::kRoundRobin;
} else {
LOG(FATAL) << "Unknown default_placement_policy: '" << policy << "'"
<< std::endl;
}
break;
}
default: {
HERMES_INVALID_CODE_PATH;
break;
Expand Down
4 changes: 4 additions & 0 deletions src/data_placement_engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,10 @@ Status CalculatePlacement(SharedMemoryContext *context, RpcContext *rpc,
}
}

if (targets.size() == 0) {
continue;
}

std::vector<u64> node_state =
GetRemainingTargetCapacities(context, rpc, targets);

Expand Down
28 changes: 14 additions & 14 deletions src/hermes_status.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,29 @@
namespace hermes {

#define RETURN_CODES(X) \
X(2, HERMES_OK_MAX, R"(Maximum supported HERMES success with \
caveat.)") \
X(2, HERMES_OK_MAX, "Maximum supported HERMES success with " \
"caveat.") \
X(1, BLOB_IN_SWAP_PLACE, "Blob is placed into swap place.") \
X(0, HERMES_SUCCESS, "No error!") \
X(-1, INVALID_BUCKET, "Bucket ID is invalid.") \
X(-2, BUCKET_NAME_TOO_LONG, "Bucket name exceeds max length (256).") \
X(-3, VBUCKET_NAME_TOO_LONG, "VBucket name exceeds max length (256).") \
X(-4, BLOB_NAME_TOO_LONG, "Blob name exceeds max length (64).") \
X(-5, INVALID_BLOB, R"(Blob data is invalid. Please check \
address and size.)") \
X(-5, INVALID_BLOB, "Blob data is invalid. Please check " \
"address and size.") \
X(-6, BLOB_NOT_IN_BUCKET, "Blob is not in this bucket.") \
X(-7, BLOB_NOT_LINKED_TO_VBUCKET, "Blob is not linked to this vbucket.") \
X(-8, TRAIT_NOT_VALID, "Selected trait is not valid.") \
X(-9, TRAIT_EXISTS_ALREADY, "Selected trait already exists.") \
X(-10, OFFSET_MAP_EMPTY, "Offset_map is empty.") \
X(-11, BLOB_NOT_LINKED_IN_MAP, "Map doesnt have the blob linked.") \
X(-12, BUCKET_IN_USE, R"(Bucket cannot be destroyed because its \
reference count is greater than 1.)") \
X(-13, DPE_RANDOM_FOUND_NO_TGT, R"(DPE random found no target with enough \
space for blob.)") \
X(-12, BUCKET_IN_USE, "Bucket cannot be destroyed because its "\
"reference count is greater than 1.") \
X(-13, DPE_RANDOM_FOUND_NO_TGT, "DPE random found no target with enough " \
"space for blob.") \
X(-14, DPE_GET_INVALID_TGT, "DPE got an invalid target ID.") \
X(-15, DPE_ORTOOLS_NO_SOLUTION, R"(DPE or-tools does not find a solution \
with provided constraints.)") \
X(-15, DPE_ORTOOLS_NO_SOLUTION, "DPE or-tools does not find a solution" \
"with provided constraints.") \
X(-16, DPE_PLACEMENTSCHEMA_EMPTY, "DPE PlacementSchema is empty.") \
X(-17, READ_BLOB_FAILED, "Read blob from its id failed.") \
X(-18, STDIO_OFFSET_ERROR, "Offset invalid or fseek() failed.") \
Expand All @@ -36,10 +36,10 @@ namespace hermes {
X(-21, STDIO_FCLOSE_FAILED, "Func fclose failed. System err msg: ") \
X(-22, FCLOSE_FAILED, "Func fclose failed. System err msg: ") \
X(-23, INVALID_FILE, "File is not valid.") \
X(-24, PLACE_SWAP_BLOB_TO_BUF_FAILED, R"(Place blob from swap space into \
buffering system failed.)") \
X(-25, DPE_RR_FIND_TGT_FAILED, R"(DPE round-robin failed to find \
proper target for blob.)") \
X(-24, PLACE_SWAP_BLOB_TO_BUF_FAILED, "Place blob from swap space into " \
"buffering system failed.") \
X(-25, DPE_RR_FIND_TGT_FAILED, "DPE round-robin failed to find " \
"proper target for blob.") \
X(-26, HERMES_ERROR_MAX, "Maximum supported HERMES errors.") \

#define RETURN_ENUM(ID, NAME, TEXT) NAME = ID,
Expand Down
19 changes: 19 additions & 0 deletions src/hermes_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,24 @@ typedef u16 DeviceID;

namespace api {
typedef std::vector<unsigned char> Blob;

enum class PlacementPolicy {
kRandom,
kRoundRobin,
kMinimizeIoTime,
};

struct Context {
static int default_buffer_organizer_retries;
static PlacementPolicy default_placement_policy;

PlacementPolicy policy;
int buffer_organizer_retries;

Context() : policy(default_placement_policy),
buffer_organizer_retries(default_buffer_organizer_retries) {}
};

} // namespace api

// TODO(chogan): These constants impose limits on the number of slabs, devices,
Expand Down Expand Up @@ -160,6 +178,7 @@ struct Config {
int buffer_organizer_port;
int rpc_host_number_range[2];
int rpc_num_threads;
api::PlacementPolicy default_placement_policy;

/** A base name for the BufferPool shared memory segement. Hermes appends the
* value of the USER environment variable to this string.
Expand Down
Loading

0 comments on commit bf3d284

Please sign in to comment.