Skip to content

Commit

Permalink
Merge 6bdc3f2 into 604d758
Browse files Browse the repository at this point in the history
  • Loading branch information
ChristopherHogan committed Feb 22, 2022
2 parents 604d758 + 6bdc3f2 commit f3adad3
Show file tree
Hide file tree
Showing 14 changed files with 185 additions and 22 deletions.
10 changes: 6 additions & 4 deletions ci/cluster/cluster_utils.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,17 @@ function hermes_cluster_up() {
# Change the default hermes.conf file to accommodate multiple nodes and
# store it at ${cluster_conf} on each node.
# 1. Replace "./" mount_points and swap_mount with ${docker_home}
# 2. Change rpc_server_base_name to 'node'
# 2. Use rpc_server_host_file
# 3. Change num_rpc_threads to 4
# 4. Change rpc_host_number_range to {1-2}
docker exec --user ${docker_user} -w ${hermes_build_dir} ${h} \
bash -c "sed -e 's|\"\./\"|\""${docker_home}"\"|g' \
-e 's|\"localhost\"|\"node\"|' \
-e 's|rpc_server_host_file = \"\"|rpc_server_host_file = \"hermes_hosts\"|' \
-e 's|rpc_num_threads = 1|rpc_num_threads = 4|' \
-e 's|{}|{1-2}|' ${conf_path} > ${cluster_conf}"
${conf_path} > ${cluster_conf}"

# Create the hosts file
docker exec --user ${docker_user} -w ${hermes_build_dir} ${h} \
bash -c "echo -e \"${host1}\n${host2}\n\" > hermes_hosts"
# Copy ssh keys to ${docker_home}/.ssh
docker exec ${h} bash -c "cp ${HOME}/.ssh/id_rsa ${docker_home}/.ssh/id_rsa"
docker exec ${h} bash -c "cp ${HOME}/.ssh/id_rsa.pub ${docker_home}/.ssh/id_rsa.pub"
Expand Down
5 changes: 5 additions & 0 deletions ci/cluster/multi_node_ci_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

set -x -e

if [[ "${CI}" != "true" ]]; then
echo "This script is only meant to be run within Github actions"
exit 1
fi

. cluster_utils.sh

# Create ssh keys for the cluster to use
Expand Down
63 changes: 52 additions & 11 deletions src/api/hermes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <sys/mman.h>

#include <cmath>
#include <fstream>

#include "glog/logging.h"

Expand Down Expand Up @@ -170,6 +171,27 @@ ArenaInfo GetArenaInfo(Config *config) {
return result;
}

std::vector<std::string> GetHostsFromFile(const std::string &host_file) {
std::vector<std::string> result;
std::fstream file;
LOG(INFO) << "Reading hosts from " << host_file;

file.open(host_file.c_str(), std::ios::in);
if (file.is_open()) {
std::string file_line;
while (getline(file, file_line)) {
if (!file_line.empty()) {
result.emplace_back(file_line);
}
}
} else {
LOG(FATAL) << "Failed to open host file " << host_file;
}
file.close();

return result;
}

SharedMemoryContext InitHermesCore(Config *config, CommunicationContext *comm,
ArenaInfo *arena_info, Arena *arenas,
RpcContext *rpc) {
Expand Down Expand Up @@ -204,12 +226,29 @@ SharedMemoryContext InitHermesCore(Config *config, CommunicationContext *comm,
rpc->state = CreateRpcState(&arenas[kArenaType_MetaData]);
mdm->rpc_state_offset = (u8 *)rpc->state - shmem_base;

rpc->host_numbers = PushArray<int>(&arenas[kArenaType_MetaData],
config->host_numbers.size());
for (size_t i = 0; i < config->host_numbers.size(); ++i) {
rpc->host_numbers[i] = config->host_numbers[i];
if (rpc->use_host_file) {
std::vector<std::string> host_names =
GetHostsFromFile(config->rpc_server_host_file);
CHECK_EQ(host_names.size(), rpc->num_nodes);

rpc->host_names = PushArray<ShmemString>(&arenas[kArenaType_MetaData],
host_names.size());

for (size_t i = 0; i < host_names.size(); ++i) {
char *host_name_mem = PushArray<char>(&arenas[kArenaType_MetaData],
host_names[i].size());
MakeShmemString(&rpc->host_names[i], (u8 *)host_name_mem, host_names[i]);
}

mdm->host_names_offset = (u8 *)rpc->host_names - (u8 *)shmem_base;
} else {
rpc->host_numbers = PushArray<int>(&arenas[kArenaType_MetaData],
config->host_numbers.size());
for (size_t i = 0; i < config->host_numbers.size(); ++i) {
rpc->host_numbers[i] = config->host_numbers[i];
}
mdm->host_numbers_offset = (u8 *)rpc->host_numbers - (u8 *)shmem_base;
}
mdm->host_numbers_offset = (u8 *)rpc->host_numbers - (u8 *)shmem_base;

InitMetadataManager(mdm, &arenas[kArenaType_MetaData], config, comm->node_id);
InitMetadataStorage(&context, mdm, &arenas[kArenaType_MetaData], config);
Expand Down Expand Up @@ -299,6 +338,8 @@ std::shared_ptr<api::Hermes> InitHermes(Config *config, bool is_daemon,
rpc.state = (void *)(context.shm_base + mdm->rpc_state_offset);
rpc.host_numbers =
(int *)((u8 *)context.shm_base + mdm->host_numbers_offset);
rpc.host_names =
(ShmemString *)((u8 *)context.shm_base + mdm->host_names_offset);
}

InitFilesForBuffering(&context, comm);
Expand All @@ -317,17 +358,17 @@ std::shared_ptr<api::Hermes> InitHermes(Config *config, bool is_daemon,
// save a reference to the context and rpc instances that are members of the
// Hermes instance.
if (comm.proc_kind == ProcessKind::kHermes) {
std::string host_number = GetHostNumberAsString(&result->rpc_,
result->rpc_.node_id);
std::string rpc_server_addr =
GetRpcAddress(&result->rpc_, config, result->rpc_.node_id,
config->rpc_port);
std::string bo_address =
GetRpcAddress(&result->rpc_, config, result->rpc_.node_id,
config->buffer_organizer_port);

std::string rpc_server_addr = GetRpcAddress(config, host_number,
config->rpc_port);
result->rpc_.start_server(&result->context_, &result->rpc_,
&result->trans_arena_, rpc_server_addr.c_str(),
config->rpc_num_threads);

std::string bo_address = GetRpcAddress(config, host_number,
config->buffer_organizer_port);
StartBufferOrganizer(&result->context_, &result->rpc_,
&result->trans_arena_, bo_address.c_str(),
config->bo_num_threads, config->buffer_organizer_port);
Expand Down
5 changes: 5 additions & 0 deletions src/config_parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ static const char *kConfigVariableStrings[ConfigVariable_Count] = {
"max_buckets_per_node",
"max_vbuckets_per_node",
"system_view_state_update_interval_ms",
"rpc_server_host_file",
"rpc_server_base_name",
"rpc_server_suffix",
"buffer_pool_shmem_name",
Expand Down Expand Up @@ -909,6 +910,10 @@ void ParseTokens(TokenList *tokens, Config *config) {
config->system_view_state_update_interval_ms = ParseInt(&tok);
break;
}
case ConfigVariable_RpcServerHostFile: {
config->rpc_server_host_file = ParseString(&tok);
break;
}
case ConfigVariable_RpcServerBaseName: {
config->rpc_server_base_name = ParseString(&tok);
break;
Expand Down
1 change: 1 addition & 0 deletions src/config_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ enum ConfigVariable {
ConfigVariable_MaxBucketsPerNode,
ConfigVariable_MaxVBucketsPerNode,
ConfigVariable_SystemViewStateUpdateInterval,
ConfigVariable_RpcServerHostFile,
ConfigVariable_RpcServerBaseName,
ConfigVariable_RpcServerSuffix,
ConfigVariable_BufferPoolShmemName,
Expand Down
3 changes: 3 additions & 0 deletions src/hermes_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,9 @@ struct Config {
/** If non-zero, the device is shared among all nodes (e.g., burst buffs) */
int is_shared_device[kMaxDevices];

/** The name of a file that contains host names, 1 per line */
std::string rpc_server_host_file;

/** The hostname of the RPC server, minus any numbers that Hermes may
* auto-generate when the rpc_hostNumber_range is specified. */
std::string rpc_server_base_name;
Expand Down
24 changes: 24 additions & 0 deletions src/metadata_management.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1479,4 +1479,28 @@ bool UnlockBlob(SharedMemoryContext *context, RpcContext *rpc, BlobID blob_id) {
return result;
}

void MakeShmemString(ShmemString *sms, u8 *memory, const char *val, u32 size) {
memcpy(memory, val, size);
sms->size = size;
// NOTE(chogan): Offset is from the beginning of this ShmemString instance, so
// the memory for a ShmemString should always be at a higher address than the
// ShmemString itself.
CHECK_LT((u8 *)sms, (u8 *)memory);
sms->offset = (u8 *)memory - (u8 *)sms;
}

void MakeShmemString(ShmemString *sms, u8 *memory, const std::string &val) {
MakeShmemString(sms, memory, val.data(), val.size());
}

std::string GetShmemString(ShmemString *sms) {
std::string result;
if (sms->offset >= sizeof(ShmemString) && sms->size > 0) {
const char *internal_string = (char *)((u8 *)sms + sms->offset);
result = std::string(internal_string, sms->size);
}

return result;
}

} // namespace hermes
51 changes: 51 additions & 0 deletions src/metadata_management.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,33 @@ static const u32 kGlobalMutexNodeId = 1;

struct RpcContext;

/**
* Representation of a non-NULL-terminated string in shared memory.
*
* Both the ShmemString iteself and the memory for the string it represents must
* reside in the same shared memory segement, and the ShmemString must be stored
* at a lower address than the string memory because the offset is from the
* ShmemString instance itself. Here is a diagram:
*
* |-----------8 bytes offset ------------|
* [ ShmemString | offset: 8 | size: 16 ] [ "string in memory" ]
*
* @see MakeShmemString()
* @see GetShmemString()
*
*/
struct ShmemString {
/** offset is from the address of this ShmemString instance iteself */
u32 offset;
/** The size of the string (not NULL terminated) */
u32 size;

ShmemString(const ShmemString &) = delete;
ShmemString(const ShmemString &&) = delete;
ShmemString& operator=(const ShmemString &) = delete;
ShmemString& operator=(const ShmemString &&) = delete;
};

enum MapType {
kMapType_Bucket,
kMapType_VBucket,
Expand Down Expand Up @@ -113,6 +140,7 @@ struct MetadataManager {
VBucketID first_free_vbucket;

ptrdiff_t rpc_state_offset;
ptrdiff_t host_names_offset;
ptrdiff_t host_numbers_offset;
ptrdiff_t system_view_state_offset;
ptrdiff_t global_system_view_state_offset;
Expand Down Expand Up @@ -454,6 +482,29 @@ void LocalReplaceBlobIdInBucket(SharedMemoryContext *context,
void ReplaceBlobIdInBucket(SharedMemoryContext *context, RpcContext *rpc,
BucketID bucket_id, BlobID old_blob_id,
BlobID new_blob_id);

/**
* Creates a ShmemString with the value @p val at location @p memory.
*
* @pre The address of @p sms must be lower than @p memory because the @p offset
* is from the beginning of the @sms.
*
* @param[out] sms The ShmemString instance to be filled out.
* @param memory The location in shared memory to store the @p val.
* @param val The string to store.
*/
void MakeShmemString(ShmemString *sms, u8 *memory, const std::string &val);

/**
* Retrieves a ShmemString into a std::string
*
* @param sms The ShmemString that represents the internal string
*
* @return A newly allocated std::string containing a copy of the string from
* shared memory, or an empty std::string if the ShmemString is invalid.
*/
std::string GetShmemString(ShmemString *sms);

} // namespace hermes

#endif // HERMES_METADATA_MANAGEMENT_H_
4 changes: 4 additions & 0 deletions src/rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,13 @@ struct RpcContext {
/** Array of host numbers in shared memory. This size is
* RpcContext::num_nodes */
int *host_numbers;
/** Array of host names stored in shared memory. This array size is
* RpcContext::num_nodes. */
ShmemString *host_names;
u32 node_id;
u32 num_nodes;
int port;
bool use_host_file;
/** The host name without the host number. Allows programmatic construction of
* predictable host names like cluster-node-1, cluster-node-2, etc. without
* storing extra copies of the base hostname.*/
Expand Down
29 changes: 23 additions & 6 deletions src/rpc_thallium.cc
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,10 @@ void InitRpcContext(RpcContext *rpc, u32 num_nodes, u32 node_id,
kMaxServerSuffixSize);

rpc->client_rpc.state_size = sizeof(ClientThalliumState);

if (!config->rpc_server_host_file.empty()) {
rpc->use_host_file = true;
}
}

void *CreateRpcState(Arena *arena) {
Expand Down Expand Up @@ -763,26 +767,39 @@ void FinalizeClient(SharedMemoryContext *context, RpcContext *rpc,
// google::ShutdownGoogleLogging();
}

std::string GetRpcAddress(Config *config, const std::string &host_number,
std::string GetHostNameFromNodeId(RpcContext *rpc, u32 node_id) {
std::string result;
if (rpc->use_host_file) {
// NOTE(chogan): node_id 0 is reserved as the NULL node
u32 index = node_id - 1;
result = GetShmemString(&rpc->host_names[index]);
} else {
std::string host_number = GetHostNumberAsString(rpc, node_id);
result = (std::string(rpc->base_hostname) + host_number +
std::string(rpc->hostname_suffix));
}

return result;
}

std::string GetRpcAddress(RpcContext *rpc, Config *config, u32 node_id,
int port) {
std::string result = config->rpc_protocol + "://";

if (!config->rpc_domain.empty()) {
result += config->rpc_domain + "/";
}
result += (config->rpc_server_base_name + host_number +
config->rpc_server_suffix + ":" + std::to_string(port));
std::string host_name = GetHostNameFromNodeId(rpc, node_id);
result += host_name + ":" + std::to_string(port);

return result;
}

std::string GetServerName(RpcContext *rpc, u32 node_id,
bool is_buffer_organizer) {
ThalliumState *tl_state = GetThalliumState(rpc);
std::string host_name = GetHostNameFromNodeId(rpc, node_id);

std::string host_number = GetHostNumberAsString(rpc, node_id);
std::string host_name = (std::string(rpc->base_hostname) + host_number +
std::string(rpc->hostname_suffix));
// TODO(chogan): @optimization Could cache the last N hostname->IP mappings to
// avoid excessive syscalls. Should profile first.
struct hostent hostname_info = {};
Expand Down
2 changes: 1 addition & 1 deletion src/rpc_thallium.h
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ void load(A &ar, api::Context &ctx) {
}
} // namespace api

std::string GetRpcAddress(Config *config, const std::string &host_number,
std::string GetRpcAddress(RpcContext *rpc, Config *config, u32 node_id,
int port);

static inline ThalliumState *GetThalliumState(RpcContext *rpc) {
Expand Down
1 change: 1 addition & 0 deletions src/utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ void InitDefaultConfig(Config *config) {

config->num_buffer_organizer_retries = 3;

config->rpc_server_host_file = "";
config->rpc_server_base_name = "localhost";
config->rpc_server_suffix = "";
config->rpc_protocol = "ofi+sockets";
Expand Down
2 changes: 2 additions & 0 deletions test/config_parser_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ void TestDefaultConfig(Arena *arena, const char *config_file) {
Assert(config.buffer_organizer_port == 8081);
Assert(config.rpc_num_threads == 1);

Assert(config.rpc_server_host_file == "");

const char expected_rpc_server_name[] = "localhost";
Assert(config.rpc_server_base_name == expected_rpc_server_name);
Assert(config.rpc_server_suffix.empty());
Expand Down
7 changes: 7 additions & 0 deletions test/data/hermes.conf
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ swap_mount = "./";
# The number of times the buffer organizer will attempt to place a blob from
# swap space into the hierarchy before giving up.
num_buffer_organizer_retries = 3;

# A path to a file containing a list of server names, 1 per line. If your
# servers are named according to a pattern (e.g., server-1, server-2, etc.),
# prefer the `rpc_server_base_name` and `rpc_host_number_range` options. If this
# option is not empty, it will override anything in `rpc_server_base_name`.
rpc_server_host_file = "";

# Base hostname for the RPC servers.
rpc_server_base_name = "localhost";
# RPC server name suffix. This is appended to the the base name plus host
Expand Down

0 comments on commit f3adad3

Please sign in to comment.