Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow reading host names from a file #354

Merged
merged 4 commits into from
Feb 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions ci/cluster/cluster_utils.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,17 @@ function hermes_cluster_up() {
# Change the default hermes.conf file to accommodate multiple nodes and
# store it at ${cluster_conf} on each node.
# 1. Replace "./" mount_points and swap_mount with ${docker_home}
# 2. Change rpc_server_base_name to 'node'
# 2. Use rpc_server_host_file
# 3. Change num_rpc_threads to 4
# 4. Change rpc_host_number_range to {1-2}
docker exec --user ${docker_user} -w ${hermes_build_dir} ${h} \
bash -c "sed -e 's|\"\./\"|\""${docker_home}"\"|g' \
-e 's|\"localhost\"|\"node\"|' \
-e 's|rpc_server_host_file = \"\"|rpc_server_host_file = \"hermes_hosts\"|' \
-e 's|rpc_num_threads = 1|rpc_num_threads = 4|' \
-e 's|{}|{1-2}|' ${conf_path} > ${cluster_conf}"
${conf_path} > ${cluster_conf}"

# Create the hosts file
docker exec --user ${docker_user} -w ${hermes_build_dir} ${h} \
bash -c "echo -e \"${host1}\n${host2}\n\" > hermes_hosts"
# Copy ssh keys to ${docker_home}/.ssh
docker exec ${h} bash -c "cp ${HOME}/.ssh/id_rsa ${docker_home}/.ssh/id_rsa"
docker exec ${h} bash -c "cp ${HOME}/.ssh/id_rsa.pub ${docker_home}/.ssh/id_rsa.pub"
Expand Down
5 changes: 5 additions & 0 deletions ci/cluster/multi_node_ci_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

set -x -e

if [[ "${CI}" != "true" ]]; then
echo "This script is only meant to be run within Github actions"
exit 1
fi

. cluster_utils.sh

# Create ssh keys for the cluster to use
Expand Down
63 changes: 52 additions & 11 deletions src/api/hermes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <sys/mman.h>

#include <cmath>
#include <fstream>

#include "glog/logging.h"

Expand Down Expand Up @@ -170,6 +171,27 @@ ArenaInfo GetArenaInfo(Config *config) {
return result;
}

std::vector<std::string> GetHostsFromFile(const std::string &host_file) {
std::vector<std::string> result;
std::fstream file;
LOG(INFO) << "Reading hosts from " << host_file;

file.open(host_file.c_str(), std::ios::in);
if (file.is_open()) {
std::string file_line;
while (getline(file, file_line)) {
if (!file_line.empty()) {
result.emplace_back(file_line);
}
}
} else {
LOG(FATAL) << "Failed to open host file " << host_file;
}
file.close();

return result;
}

SharedMemoryContext InitHermesCore(Config *config, CommunicationContext *comm,
ArenaInfo *arena_info, Arena *arenas,
RpcContext *rpc) {
Expand Down Expand Up @@ -204,12 +226,29 @@ SharedMemoryContext InitHermesCore(Config *config, CommunicationContext *comm,
rpc->state = CreateRpcState(&arenas[kArenaType_MetaData]);
mdm->rpc_state_offset = (u8 *)rpc->state - shmem_base;

rpc->host_numbers = PushArray<int>(&arenas[kArenaType_MetaData],
config->host_numbers.size());
for (size_t i = 0; i < config->host_numbers.size(); ++i) {
rpc->host_numbers[i] = config->host_numbers[i];
if (rpc->use_host_file) {
std::vector<std::string> host_names =
GetHostsFromFile(config->rpc_server_host_file);
CHECK_EQ(host_names.size(), rpc->num_nodes);

rpc->host_names = PushArray<ShmemString>(&arenas[kArenaType_MetaData],
host_names.size());

for (size_t i = 0; i < host_names.size(); ++i) {
char *host_name_mem = PushArray<char>(&arenas[kArenaType_MetaData],
host_names[i].size());
MakeShmemString(&rpc->host_names[i], (u8 *)host_name_mem, host_names[i]);
}

mdm->host_names_offset = (u8 *)rpc->host_names - (u8 *)shmem_base;
} else {
rpc->host_numbers = PushArray<int>(&arenas[kArenaType_MetaData],
config->host_numbers.size());
for (size_t i = 0; i < config->host_numbers.size(); ++i) {
rpc->host_numbers[i] = config->host_numbers[i];
}
mdm->host_numbers_offset = (u8 *)rpc->host_numbers - (u8 *)shmem_base;
}
mdm->host_numbers_offset = (u8 *)rpc->host_numbers - (u8 *)shmem_base;

InitMetadataManager(mdm, &arenas[kArenaType_MetaData], config, comm->node_id);
InitMetadataStorage(&context, mdm, &arenas[kArenaType_MetaData], config);
Expand Down Expand Up @@ -299,6 +338,8 @@ std::shared_ptr<api::Hermes> InitHermes(Config *config, bool is_daemon,
rpc.state = (void *)(context.shm_base + mdm->rpc_state_offset);
rpc.host_numbers =
(int *)((u8 *)context.shm_base + mdm->host_numbers_offset);
rpc.host_names =
(ShmemString *)((u8 *)context.shm_base + mdm->host_names_offset);
}

InitFilesForBuffering(&context, comm);
Expand All @@ -317,17 +358,17 @@ std::shared_ptr<api::Hermes> InitHermes(Config *config, bool is_daemon,
// save a reference to the context and rpc instances that are members of the
// Hermes instance.
if (comm.proc_kind == ProcessKind::kHermes) {
std::string host_number = GetHostNumberAsString(&result->rpc_,
result->rpc_.node_id);
std::string rpc_server_addr =
GetRpcAddress(&result->rpc_, config, result->rpc_.node_id,
config->rpc_port);
std::string bo_address =
GetRpcAddress(&result->rpc_, config, result->rpc_.node_id,
config->buffer_organizer_port);

std::string rpc_server_addr = GetRpcAddress(config, host_number,
config->rpc_port);
result->rpc_.start_server(&result->context_, &result->rpc_,
&result->trans_arena_, rpc_server_addr.c_str(),
config->rpc_num_threads);

std::string bo_address = GetRpcAddress(config, host_number,
config->buffer_organizer_port);
StartBufferOrganizer(&result->context_, &result->rpc_,
&result->trans_arena_, bo_address.c_str(),
config->bo_num_threads, config->buffer_organizer_port);
Expand Down
5 changes: 5 additions & 0 deletions src/config_parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ static const char *kConfigVariableStrings[ConfigVariable_Count] = {
"max_buckets_per_node",
"max_vbuckets_per_node",
"system_view_state_update_interval_ms",
"rpc_server_host_file",
"rpc_server_base_name",
"rpc_server_suffix",
"buffer_pool_shmem_name",
Expand Down Expand Up @@ -909,6 +910,10 @@ void ParseTokens(TokenList *tokens, Config *config) {
config->system_view_state_update_interval_ms = ParseInt(&tok);
break;
}
case ConfigVariable_RpcServerHostFile: {
config->rpc_server_host_file = ParseString(&tok);
break;
}
case ConfigVariable_RpcServerBaseName: {
config->rpc_server_base_name = ParseString(&tok);
break;
Expand Down
1 change: 1 addition & 0 deletions src/config_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ enum ConfigVariable {
ConfigVariable_MaxBucketsPerNode,
ConfigVariable_MaxVBucketsPerNode,
ConfigVariable_SystemViewStateUpdateInterval,
ConfigVariable_RpcServerHostFile,
ConfigVariable_RpcServerBaseName,
ConfigVariable_RpcServerSuffix,
ConfigVariable_BufferPoolShmemName,
Expand Down
3 changes: 3 additions & 0 deletions src/hermes_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,9 @@ struct Config {
/** If non-zero, the device is shared among all nodes (e.g., burst buffs) */
int is_shared_device[kMaxDevices];

/** The name of a file that contains host names, 1 per line */
std::string rpc_server_host_file;

/** The hostname of the RPC server, minus any numbers that Hermes may
* auto-generate when the rpc_hostNumber_range is specified. */
std::string rpc_server_base_name;
Expand Down
24 changes: 24 additions & 0 deletions src/metadata_management.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1479,4 +1479,28 @@ bool UnlockBlob(SharedMemoryContext *context, RpcContext *rpc, BlobID blob_id) {
return result;
}

void MakeShmemString(ShmemString *sms, u8 *memory, const char *val, u32 size) {
memcpy(memory, val, size);
sms->size = size;
// NOTE(chogan): Offset is from the beginning of this ShmemString instance, so
// the memory for a ShmemString should always be at a higher address than the
// ShmemString itself.
CHECK_LT((u8 *)sms, (u8 *)memory);
sms->offset = (u8 *)memory - (u8 *)sms;
}

void MakeShmemString(ShmemString *sms, u8 *memory, const std::string &val) {
MakeShmemString(sms, memory, val.data(), val.size());
}

std::string GetShmemString(ShmemString *sms) {
std::string result;
if (sms->offset >= sizeof(ShmemString) && sms->size > 0) {
const char *internal_string = (char *)((u8 *)sms + sms->offset);
result = std::string(internal_string, sms->size);
}

return result;
}

} // namespace hermes
51 changes: 51 additions & 0 deletions src/metadata_management.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,33 @@ static const u32 kGlobalMutexNodeId = 1;

struct RpcContext;

/**
* Representation of a non-NULL-terminated string in shared memory.
*
* Both the ShmemString iteself and the memory for the string it represents must
* reside in the same shared memory segement, and the ShmemString must be stored
* at a lower address than the string memory because the offset is from the
* ShmemString instance itself. Here is a diagram:
*
* |-----------8 bytes offset ------------|
* [ ShmemString | offset: 8 | size: 16 ] [ "string in memory" ]
*
* @see MakeShmemString()
* @see GetShmemString()
*
*/
struct ShmemString {
/** offset is from the address of this ShmemString instance iteself */
u32 offset;
/** The size of the string (not NULL terminated) */
u32 size;

ShmemString(const ShmemString &) = delete;
ShmemString(const ShmemString &&) = delete;
ShmemString& operator=(const ShmemString &) = delete;
ShmemString& operator=(const ShmemString &&) = delete;
};

enum MapType {
kMapType_Bucket,
kMapType_VBucket,
Expand Down Expand Up @@ -113,6 +140,7 @@ struct MetadataManager {
VBucketID first_free_vbucket;

ptrdiff_t rpc_state_offset;
ptrdiff_t host_names_offset;
ptrdiff_t host_numbers_offset;
ptrdiff_t system_view_state_offset;
ptrdiff_t global_system_view_state_offset;
Expand Down Expand Up @@ -454,6 +482,29 @@ void LocalReplaceBlobIdInBucket(SharedMemoryContext *context,
void ReplaceBlobIdInBucket(SharedMemoryContext *context, RpcContext *rpc,
BucketID bucket_id, BlobID old_blob_id,
BlobID new_blob_id);

/**
* Creates a ShmemString with the value @p val at location @p memory.
*
* @pre The address of @p sms must be lower than @p memory because the @p offset
* is from the beginning of the @sms.
*
* @param[out] sms The ShmemString instance to be filled out.
* @param memory The location in shared memory to store the @p val.
* @param val The string to store.
*/
void MakeShmemString(ShmemString *sms, u8 *memory, const std::string &val);

/**
* Retrieves a ShmemString into a std::string
*
* @param sms The ShmemString that represents the internal string
*
* @return A newly allocated std::string containing a copy of the string from
* shared memory, or an empty std::string if the ShmemString is invalid.
*/
std::string GetShmemString(ShmemString *sms);

} // namespace hermes

#endif // HERMES_METADATA_MANAGEMENT_H_
4 changes: 4 additions & 0 deletions src/rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,13 @@ struct RpcContext {
/** Array of host numbers in shared memory. This size is
* RpcContext::num_nodes */
int *host_numbers;
/** Array of host names stored in shared memory. This array size is
* RpcContext::num_nodes. */
ShmemString *host_names;
u32 node_id;
u32 num_nodes;
int port;
bool use_host_file;
/** The host name without the host number. Allows programmatic construction of
* predictable host names like cluster-node-1, cluster-node-2, etc. without
* storing extra copies of the base hostname.*/
Expand Down
29 changes: 23 additions & 6 deletions src/rpc_thallium.cc
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,10 @@ void InitRpcContext(RpcContext *rpc, u32 num_nodes, u32 node_id,
kMaxServerSuffixSize);

rpc->client_rpc.state_size = sizeof(ClientThalliumState);

if (!config->rpc_server_host_file.empty()) {
rpc->use_host_file = true;
}
}

void *CreateRpcState(Arena *arena) {
Expand Down Expand Up @@ -763,26 +767,39 @@ void FinalizeClient(SharedMemoryContext *context, RpcContext *rpc,
// google::ShutdownGoogleLogging();
}

std::string GetRpcAddress(Config *config, const std::string &host_number,
std::string GetHostNameFromNodeId(RpcContext *rpc, u32 node_id) {
std::string result;
if (rpc->use_host_file) {
// NOTE(chogan): node_id 0 is reserved as the NULL node
u32 index = node_id - 1;
result = GetShmemString(&rpc->host_names[index]);
} else {
std::string host_number = GetHostNumberAsString(rpc, node_id);
result = (std::string(rpc->base_hostname) + host_number +
std::string(rpc->hostname_suffix));
}

return result;
}

std::string GetRpcAddress(RpcContext *rpc, Config *config, u32 node_id,
int port) {
std::string result = config->rpc_protocol + "://";

if (!config->rpc_domain.empty()) {
result += config->rpc_domain + "/";
}
result += (config->rpc_server_base_name + host_number +
config->rpc_server_suffix + ":" + std::to_string(port));
std::string host_name = GetHostNameFromNodeId(rpc, node_id);
result += host_name + ":" + std::to_string(port);

return result;
}

std::string GetServerName(RpcContext *rpc, u32 node_id,
bool is_buffer_organizer) {
ThalliumState *tl_state = GetThalliumState(rpc);
std::string host_name = GetHostNameFromNodeId(rpc, node_id);

std::string host_number = GetHostNumberAsString(rpc, node_id);
std::string host_name = (std::string(rpc->base_hostname) + host_number +
std::string(rpc->hostname_suffix));
// TODO(chogan): @optimization Could cache the last N hostname->IP mappings to
// avoid excessive syscalls. Should profile first.
struct hostent hostname_info = {};
Expand Down
2 changes: 1 addition & 1 deletion src/rpc_thallium.h
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ void load(A &ar, api::Context &ctx) {
}
} // namespace api

std::string GetRpcAddress(Config *config, const std::string &host_number,
std::string GetRpcAddress(RpcContext *rpc, Config *config, u32 node_id,
int port);

static inline ThalliumState *GetThalliumState(RpcContext *rpc) {
Expand Down
1 change: 1 addition & 0 deletions src/utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ void InitDefaultConfig(Config *config) {

config->num_buffer_organizer_retries = 3;

config->rpc_server_host_file = "";
config->rpc_server_base_name = "localhost";
config->rpc_server_suffix = "";
config->rpc_protocol = "ofi+sockets";
Expand Down
2 changes: 2 additions & 0 deletions test/config_parser_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ void TestDefaultConfig(Arena *arena, const char *config_file) {
Assert(config.buffer_organizer_port == 8081);
Assert(config.rpc_num_threads == 1);

Assert(config.rpc_server_host_file == "");

const char expected_rpc_server_name[] = "localhost";
Assert(config.rpc_server_base_name == expected_rpc_server_name);
Assert(config.rpc_server_suffix.empty());
Expand Down
7 changes: 7 additions & 0 deletions test/data/hermes.conf
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ swap_mount = "./";
# The number of times the buffer organizer will attempt to place a blob from
# swap space into the hierarchy before giving up.
num_buffer_organizer_retries = 3;

# A path to a file containing a list of server names, 1 per line. If your
# servers are named according to a pattern (e.g., server-1, server-2, etc.),
# prefer the `rpc_server_base_name` and `rpc_host_number_range` options. If this
# option is not empty, it will override anything in `rpc_server_base_name`.
rpc_server_host_file = "";

# Base hostname for the RPC servers.
rpc_server_base_name = "localhost";
# RPC server name suffix. This is appended to the the base name plus host
Expand Down