Skip to content

Commit

Permalink
Merge 03856f2 into f215aaf
Browse files Browse the repository at this point in the history
  • Loading branch information
ChristopherHogan committed Mar 5, 2021
2 parents f215aaf + 03856f2 commit 2740da2
Show file tree
Hide file tree
Showing 14 changed files with 288 additions and 10 deletions.
23 changes: 17 additions & 6 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# This is a basic workflow to help you get started with Actions
# CodeCoverage result can be found at https://coveralls.io/github/HDFGroup/hermes

name: GitHub Actions
Expand All @@ -21,7 +20,7 @@ jobs:
# This workflow contains a single job called "build"
build:
# The type of runner that the job will run on
runs-on: ubuntu-18.04
runs-on: ubuntu-20.04

# Steps represent a sequence of tasks that will be executed as part of the job
steps:
Expand Down Expand Up @@ -57,11 +56,23 @@ jobs:
run: ci/install_deps.sh

- name: Build and Test
run: ci/install_hermes.sh

- name: Multi-node Test
run: pushd ci/cluster && ./multi_node_ci_test.sh

- name: Generate coverage file
run: |
ci/install_hermes.sh
mkdir -p "${GITHUB_WORKSPACE}/coverage"
pushd build/
lcov -c -d . -o "${GITHUB_WORKSPACE}/coverage/lcov.info"
COVERAGE_DIR=${GITHUB_WORKSPACE}/coverage
mkdir -p ${COVERAGE_DIR}
pushd ${GITHUB_WORKSPACE}/build
lcov -c -d . -o "${COVERAGE_DIR}/tmp.info"
lcov --remove "${COVERAGE_DIR}/tmp.info" \
"/usr/include/*" \
"${HOME}/${LOCAL}/*" \
"*/stb_ds.h" \
-o ${COVERAGE_DIR}/lcov.info
- name: Coveralls
uses: coverallsapp/github-action@master
with:
Expand Down
54 changes: 54 additions & 0 deletions ci/cluster/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
FROM ubuntu:20.04

ENV USER mpirun

ENV DEBIAN_FRONTEND=noninteractive \
HOME=/home/${USER}

RUN apt-get update -y && \
apt-get install -y --no-install-recommends \
sudo \
apt-utils \
&& apt-get install -y --no-install-recommends \
openssh-server \
gcc \
g++ \
libfabric-dev \
mpich \
binutils \
&& apt-get clean && apt-get purge && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*

RUN mkdir /var/run/sshd
RUN echo 'root:${USER}' | chpasswd
RUN sed -i 's/PermitRootLogin without-password/PermitRootLogin yes/' /etc/ssh/sshd_config

# SSH login fix. Otherwise user is kicked off after login
RUN sed 's@session\s*required\s*pam_loginuid.so@session optional pam_loginuid.so@g' -i /etc/pam.d/sshd

ENV NOTVISIBLE "in users profile"
RUN echo "export VISIBLE=now" >> /etc/profile

# ------------------------------------------------------------
# Add an 'mpirun' user
# ------------------------------------------------------------

ARG USER_ID
ARG GROUP_ID

RUN addgroup --gid ${GROUP_ID} ${USER}
RUN adduser --disabled-password --gecos "" --uid ${USER_ID} --gid ${GROUP_ID} ${USER} && \
echo "${USER} ALL=(ALL) NOPASSWD:ALL" >> /etc/sudoers

# ------------------------------------------------------------
# Set-Up SSH with our Github deploy key
# ------------------------------------------------------------

ENV SSHDIR ${HOME}/.ssh/

RUN mkdir -p ${SSHDIR}
RUN echo "StrictHostKeyChecking no" > ${SSHDIR}/config

RUN chmod -R 600 ${SSHDIR}* && \
chown -R ${USER}:${USER} ${SSHDIR}

CMD ["/usr/sbin/sshd", "-D"]
4 changes: 4 additions & 0 deletions ci/cluster/cluster_down.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/bin/bash

. cluster_utils.sh
hermes_cluster_down
4 changes: 4 additions & 0 deletions ci/cluster/cluster_test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/bin/bash

. cluster_utils.sh
hermes_cluster_test ${1:-}
4 changes: 4 additions & 0 deletions ci/cluster/cluster_up.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/bin/bash

. cluster_utils.sh
hermes_cluster_up
66 changes: 66 additions & 0 deletions ci/cluster/cluster_utils.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#!/bin/bash

set -x -e

node_names=($(awk '/hostname:/ {print $2}' docker-compose.yml))
docker_user=mpirun
docker_home=/home/${docker_user}
cluster_conf=${docker_home}/hermes.conf
script_dir="$(cd $(dirname "${BASH_SOURCE[0]}") && pwd)"
hermes_build_dir=${script_dir}/../../build
project_name="$(basename ${script_dir})"
host1=${project_name}_${node_names[0]}_1
host2=${project_name}_${node_names[1]}_1


# Build images and start a cluster
function hermes_cluster_up() {
local num_workers=${1:-1}
local conf_path=${script_dir}/../../test/data/hermes.conf

# Build the images, passing our user id and group id so the container user can
# modify the .gcda coverage files
for n in "${node_names[@]}"; do
docker-compose build --build-arg GROUP_ID=$(id -g) --build-arg USER_ID=$(id -u) ${n}
done

docker-compose up -d --scale ${node_names[0]}=1 --scale ${node_names[1]}=${num_workers} --no-recreate

for h in ${host1} ${host2}; do
# Change the default hermes.conf file to accommodate multiple nodes and
# store it at ${cluster_conf} on each node.
# 1. Replace "./" mount_points and swap_mount with ${docker_home}
# 2. Change rpc_server_base_name to 'node'
# 3. Change num_rpc_threads to 4
# 4. Change rpc_host_number_range to {1, 2}
docker exec --user ${docker_user} -w ${hermes_build_dir} ${h} \
bash -c "sed -e 's|\"\./\"|\""${docker_home}"\"|g' \
-e 's|\"localhost\"|\"node\"|' \
-e 's|rpc_num_threads = 1|rpc_num_threads = 4|' \
-e 's|{0, 0}|{1, 2}|' ${conf_path} > ${cluster_conf}"

# Copy ssh keys to ${docker_home}/.ssh
docker exec ${h} bash -c "cp ${HOME}/.ssh/id_rsa ${docker_home}/.ssh/id_rsa"
docker exec ${h} bash -c "cp ${HOME}/.ssh/id_rsa.pub ${docker_home}/.ssh/id_rsa.pub"
docker exec ${h} bash -c "cp ${HOME}/.ssh/id_rsa.pub ${docker_home}/.ssh/authorized_keys"
docker exec ${h} chown -R ${docker_user}:${docker_user} ${docker_home}/.ssh
done
}

function hermes_cluster_test() {
local allocate_tty=${1:-}
local hosts=${host1},${host2}

docker-compose exec ${allocate_tty} \
-e GLOG_vmodule=rpc_thallium=1 \
-e LSAN_OPTIONS=suppressions=../test/data/asan.supp \
--user ${docker_user} \
-w ${hermes_build_dir} \
${node_names[0]} \
mpirun -n 4 -ppn 2 -hosts ${hosts} bin/end_to_end_test ${cluster_conf}
}

# Stop the cluster
function hermes_cluster_down() {
docker-compose down
}
24 changes: 24 additions & 0 deletions ci/cluster/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
version: "3"

services:
node1:
build: .
links:
- node2
networks:
- net
volumes:
- $HOME:$HOME
hostname: node1

node2:
build: .
networks:
- net
volumes:
- $HOME:$HOME
hostname: node2

networks:
net:
driver: bridge
17 changes: 17 additions & 0 deletions ci/cluster/multi_node_ci_test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#!/bin/bash

set -x -e

. cluster_utils.sh

# Create ssh keys for the cluster to use
echo -e 'y\n' | ssh-keygen -q -t rsa -N "" -f ~/.ssh/id_rsa

# Start a two node cluster
hermes_cluster_up

# Run the Hermes tests on the cluster (without allocating a tty)
hermes_cluster_test "-T"

# Shutdown the cluster
hermes_cluster_down
4 changes: 4 additions & 0 deletions src/buffer_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ void Finalize(SharedMemoryContext *context, CommunicationContext *comm,
RpcContext *rpc, const char *shmem_name, Arena *trans_arena,
bool is_application_core, bool force_rpc_shutdown) {
WorldBarrier(comm);
if (!is_application_core && comm->first_on_node) {
StopGlobalSystemViewStateUpdateThread(rpc);
}
WorldBarrier(comm);
ShutdownRpcClients(rpc);

if (is_application_core) {
Expand Down
31 changes: 31 additions & 0 deletions src/metadata_management.cc
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ BucketID GetOrCreateBucketId(SharedMemoryContext *context, RpcContext *rpc,
const std::string &name) {
MetadataManager *mdm = GetMetadataManagerFromContext(context);

BeginGlobalTicketMutex(context, rpc);
BeginTicketMutex(&mdm->bucket_mutex);
BucketID result = GetBucketIdByName(context, rpc, name.c_str());

Expand All @@ -286,6 +287,7 @@ BucketID GetOrCreateBucketId(SharedMemoryContext *context, RpcContext *rpc,
result = GetNextFreeBucketId(context, rpc, name);
}
EndTicketMutex(&mdm->bucket_mutex);
EndGlobalTicketMutex(context, rpc);

return result;
}
Expand Down Expand Up @@ -1051,4 +1053,33 @@ std::vector<TargetID> GetNeighborhoodTargets(SharedMemoryContext *context,
return result;
}

void LocalBeginGlobalTicketMutex(MetadataManager *mdm) {
BeginTicketMutex(&mdm->global_mutex);
}

void LocalEndGlobalTicketMutex(MetadataManager *mdm) {
EndTicketMutex(&mdm->global_mutex);
}

void BeginGlobalTicketMutex(SharedMemoryContext *context, RpcContext *rpc) {
if (rpc->node_id == kGlobalMutexNodeId) {
MetadataManager *mdm = GetMetadataManagerFromContext(context);
LocalBeginGlobalTicketMutex(mdm);
} else {
[[maybe_unused]]
bool result = RpcCall<bool>(rpc, kGlobalMutexNodeId,
"RemoteBeginGlobalTicketMutex");
}
}

void EndGlobalTicketMutex(SharedMemoryContext *context, RpcContext *rpc) {
if (rpc->node_id == kGlobalMutexNodeId) {
MetadataManager *mdm = GetMetadataManagerFromContext(context);
LocalEndGlobalTicketMutex(mdm);
} else {
[[maybe_unused]]
bool result = RpcCall<bool>(rpc, kGlobalMutexNodeId,
"RemoteEndGlobalTicketMutex");
}
}
} // namespace hermes
31 changes: 31 additions & 0 deletions src/metadata_management.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

namespace hermes {

static const u32 kGlobalMutexNodeId = 1;

struct RpcContext;

enum MapType {
Expand Down Expand Up @@ -103,6 +105,15 @@ struct MetadataManager {
// TODO(chogan): @optimization Should the TicketMutexes here be reader/writer
// locks?

// TODO(chogan): @optimization Hopefully this is used rarely. If it becomes
// something that's commonly used, we need to come up with something smarter.
/** Mutex shared by all nodes for operations that require synchronization.
*
* Should only be accessed via BeginGlobalTicketMutex() and
* EndGlobalTicketMutex().
*/
TicketMutex global_mutex;

/** Lock for accessing `BucketInfo` structures located at
* `bucket_info_offset` */
TicketMutex bucket_mutex;
Expand Down Expand Up @@ -296,6 +307,26 @@ void DecrementRefcount(SharedMemoryContext *context, RpcContext *rpc,
*/
bool IsNullBlobId(BlobID id);

/**
*
*/
void BeginGlobalTicketMutex(SharedMemoryContext *context, RpcContext *rpc);

/**
*
*/
void EndGlobalTicketMutex(SharedMemoryContext *context, RpcContext *rpc);

/**
*
*/
void LocalBeginGlobalTicketMutex(MetadataManager *mdm);

/**
*
*/
void LocalEndGlobalTicketMutex(MetadataManager *mdm);

} // namespace hermes

#endif // HERMES_METADATA_MANAGEMENT_H_
32 changes: 29 additions & 3 deletions src/rpc_thallium.cc
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,28 @@ void ThalliumStartRpcServer(SharedMemoryContext *context, RpcContext *rpc,
state->engine->finalize();
};

// TODO(chogan): Only one node needs this. Separate RPC server?
auto rpc_begin_global_ticket_mutex = [context, rpc](const tl::request &req) {
DLOG_ASSERT(rpc->node_id == kGlobalMutexNodeId);
MetadataManager *mdm = GetMetadataManagerFromContext(context);
LocalBeginGlobalTicketMutex(mdm);

req.respond(true);
};

auto rpc_end_global_ticket_mutex = [context, rpc](const tl::request &req) {
DLOG_ASSERT(rpc->node_id == kGlobalMutexNodeId);
MetadataManager *mdm = GetMetadataManagerFromContext(context);
LocalEndGlobalTicketMutex(mdm);

req.respond(true);
};
rpc_server->define("RemoteBeginGlobalTicketMutex",
rpc_begin_global_ticket_mutex);
rpc_server->define("RemoteEndGlobalTicketMutex",
rpc_end_global_ticket_mutex);
//

// TODO(chogan): Currently these three are only used for testing.
rpc_server->define("GetBuffers", rpc_get_buffers);
rpc_server->define("SplitBuffers", rpc_split_buffers).disable_response();
Expand Down Expand Up @@ -498,6 +520,13 @@ void StartGlobalSystemViewStateUpdateThread(SharedMemoryContext *context,
ABT_THREAD_ATTR_NULL, NULL);
}

void StopGlobalSystemViewStateUpdateThread(RpcContext *rpc) {
ThalliumState *state = GetThalliumState(rpc);
state->kill_requested.store(true);
ABT_xstream_join(state->execution_stream);
ABT_xstream_free(&state->execution_stream);
}

void InitRpcContext(RpcContext *rpc, u32 num_nodes, u32 node_id,
Config *config) {
rpc->num_nodes = num_nodes;
Expand Down Expand Up @@ -557,9 +586,6 @@ void ShutdownRpcClients(RpcContext *rpc) {

void FinalizeRpcContext(RpcContext *rpc, bool is_daemon) {
ThalliumState *state = GetThalliumState(rpc);
state->kill_requested.store(true);
ABT_xstream_join(state->execution_stream);
ABT_xstream_free(&state->execution_stream);

if (is_daemon) {
state->engine->wait_for_finalize();
Expand Down
2 changes: 2 additions & 0 deletions src/rpc_thallium.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ ClientThalliumState *GetClientThalliumState(RpcContext *rpc) {
template<typename ReturnType, typename... Ts>
ReturnType RpcCall(RpcContext *rpc, u32 node_id, const char *func_name,
Ts... args) {
VLOG(1) << "Calling " << func_name << " on node " << node_id
<< " from node " << rpc->node_id << std::endl;
ClientThalliumState *state = GetClientThalliumState(rpc);
std::string server_name = GetServerName(rpc, node_id);
tl::remote_procedure remote_proc = state->engine->define(func_name);
Expand Down
Loading

0 comments on commit 2740da2

Please sign in to comment.