Skip to content

Commit

Permalink
omp: elastically scale-up loops (#870)
Browse files Browse the repository at this point in the history
* omp: elastically scale-up loops

* openmp: fix the next level's size for thread 0

* openmp: fix single-thread forks

* openmp: add concurrent app test

* gh: bump faabric dep

* faasmctl: bump to version 0.43.0

* gh: bump code version to 0.27.0

* openmp: fixes for single-threaded elastic execution

* omp: fix race-conditions with elastic scaling at scale

* gh: bum cpp and faabric after merge to main
  • Loading branch information
csegarragonz committed May 14, 2024
1 parent 3ccc453 commit 2f17f92
Show file tree
Hide file tree
Showing 27 changed files with 380 additions and 84 deletions.
10 changes: 5 additions & 5 deletions .env
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
FAASM_VERSION=0.26.0
FAASM_CLI_IMAGE=faasm.azurecr.io/cli:0.26.0
FAASM_WORKER_IMAGE=faasm.azurecr.io/worker:0.26.0
FAASM_VERSION=0.27.0
FAASM_CLI_IMAGE=faasm.azurecr.io/cli:0.27.0
FAASM_WORKER_IMAGE=faasm.azurecr.io/worker:0.27.0

FAABRIC_VERSION=0.19.0
FAABRIC_PLANNER_IMAGE=faasm.azurecr.io/planner:0.19.0
FAABRIC_VERSION=0.20.0
FAABRIC_PLANNER_IMAGE=faasm.azurecr.io/planner:0.20.0

CPP_VERSION=0.5.0
CPP_CLI_IMAGE=faasm.azurecr.io/cpp-sysroot:0.5.0
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/azure.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ jobs:
env:
CLUSTER_NAME_BASE: gha-cluster
FAASM_INI_FILE: ./faasm.ini
FAASM_VERSION: 0.26.0
FAASM_VERSION: 0.27.0
FAASM_WASM_VM: ${{ matrix.wasm_vm }}
steps:
- name: "Check out the experiment-base code"
Expand All @@ -56,7 +56,7 @@ jobs:
./bin/inv_wrapper.sh cluster.credentials --name ${{ env.CLUSTER_NAME }}
working-directory: ${{ github.workspace }}/experiment-base
- name: "Install faasmctl"
run: source ./bin/workon.sh && pip3 install faasmctl==0.39.0
run: source ./bin/workon.sh && pip3 install faasmctl==0.43.0
working-directory: ${{ github.workspace }}/experiment-base
- name: "Deploy Faasm on k8s cluster"
run: source ./bin/workon.sh && faasmctl deploy.k8s --workers=1
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/sgx_hw.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
runs-on: self-hosted
env:
VM_BASE_NAME: gha-sgx-hw-vm
FAASM_VERSION: 0.26.0
FAASM_VERSION: 0.27.0
steps:
- name: "Check out the experiment-base code"
uses: actions/checkout@v4
Expand Down
8 changes: 4 additions & 4 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@ concurrency:
# Top-level env. vars shared by most jobs
env:
CONAN_CACHE_MOUNT_SOURCE: .conan
FAABRIC_VERSION: 0.19.0
FAABRIC_VERSION: 0.20.0
FAASM_INI_FILE: ./faasm.ini
FAASM_VERSION: 0.26.0
FAASMCTL_VERSION: 0.39.0
FAASM_VERSION: 0.27.0
FAASMCTL_VERSION: 0.43.0

jobs:
checks:
if: github.event.pull_request.draft == false
runs-on: ubuntu-latest
container:
image: faasm.azurecr.io/cli:0.26.0
image: faasm.azurecr.io/cli:0.27.0
steps:
- name: "Checkout code"
uses: actions/checkout@v4
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.26.0
0.27.0
2 changes: 1 addition & 1 deletion clients/cpp
2 changes: 1 addition & 1 deletion deploy/k8s-common/minio.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ spec:
- control
containers:
- name: minio-main
image: faasm.azurecr.io/minio:0.26.0
image: faasm.azurecr.io/minio:0.27.0
env:
- name: MINIO_ROOT_USER
value: "minio"
Expand Down
2 changes: 1 addition & 1 deletion deploy/k8s-common/planner.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ spec:
- control
containers:
- name: planner
image: faasm.azurecr.io/planner:0.19.0
image: faasm.azurecr.io/planner:0.20.0
ports:
- containerPort: 8081
env:
Expand Down
4 changes: 2 additions & 2 deletions deploy/k8s-common/redis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ spec:
- control
containers:
- name: master
image: faasm.azurecr.io/redis:0.26.0
image: faasm.azurecr.io/redis:0.27.0
ports:
- containerPort: 6379

Expand All @@ -46,7 +46,7 @@ spec:
- control
containers:
- name: master
image: faasm.azurecr.io/redis:0.26.0
image: faasm.azurecr.io/redis:0.27.0
ports:
- containerPort: 6379

Expand Down
2 changes: 1 addition & 1 deletion deploy/k8s-sgx/upload.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ spec:
- control
containers:
- name: upload
image: faasm.azurecr.io/upload:0.26.0
image: faasm.azurecr.io/upload:0.27.0
ports:
- containerPort: 8002
- containerPort: 5000
Expand Down
2 changes: 1 addition & 1 deletion deploy/k8s-sgx/worker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ spec:
weight: 100

containers:
- image: faasm.azurecr.io/worker-sgx:0.26.0
- image: faasm.azurecr.io/worker-sgx:0.27.0
name: faasm-worker
ports:
- containerPort: 8080
Expand Down
2 changes: 1 addition & 1 deletion deploy/k8s-wamr/upload.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ spec:
- control
containers:
- name: upload
image: faasm.azurecr.io/upload:0.26.0
image: faasm.azurecr.io/upload:0.27.0
ports:
- containerPort: 8002
- containerPort: 5000
Expand Down
2 changes: 1 addition & 1 deletion deploy/k8s-wamr/worker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ spec:
weight: 100

containers:
- image: faasm.azurecr.io/worker:0.26.0
- image: faasm.azurecr.io/worker:0.27.0
name: faasm-worker
ports:
- containerPort: 8080
Expand Down
2 changes: 1 addition & 1 deletion deploy/k8s/upload.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ spec:
- control
containers:
- name: upload
image: faasm.azurecr.io/upload:0.26.0
image: faasm.azurecr.io/upload:0.27.0
ports:
- containerPort: 8002
- containerPort: 5000
Expand Down
2 changes: 1 addition & 1 deletion deploy/k8s/worker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ spec:
weight: 100

containers:
- image: faasm.azurecr.io/worker:0.26.0
- image: faasm.azurecr.io/worker:0.27.0
name: faasm-worker
ports:
- containerPort: 8080
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
black>=23.12.0
breathe>=4.35.0
faasmctl==0.39.0
faasmctl==0.43.0
flake8>=7.0.0
invoke>=2.0.0
myst_parser>=2.0.0
Expand Down
10 changes: 10 additions & 0 deletions src/threads/ThreadState.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,16 @@ void setCurrentOpenMPLevel(
std::string funcStr = faabric::util::funcToString(req);

currentLevel = levelFromBatchRequest(req);

// If scaling elastically, check if we are oversubscribed (i.e. we are
// more threads than originally expected to)
if (req->elasticscalehint()) {
auto& broker = faabric::transport::getPointToPointBroker();
int actualGroupSize =
broker.getIdxsRegisteredForGroup(req->groupid()).size();
currentLevel->numThreads = actualGroupSize;
}

SPDLOG_TRACE(
"Deserialised thread-local OpenMP level from {} bytes for {}, {}",
req->contextdata().size(),
Expand Down
11 changes: 9 additions & 2 deletions src/wamr/WAMRWasmModule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -355,11 +355,18 @@ int32_t WAMRWasmModule::executeOMPThread(int threadPoolIdx,
{
auto funcStr = faabric::util::funcToString(msg, false);
int wasmFuncPtr = msg.funcptr();
SPDLOG_DEBUG("Executing OpenMP thread {} for {}", threadPoolIdx, funcStr);
SPDLOG_DEBUG("Executing OpenMP thread {} for {} (app: {}, funcptr: {}))",
threadPoolIdx,
funcStr,
msg.appid(),
wasmFuncPtr);

auto* execEnv = execEnvs.at(threadPoolIdx);
if (execEnvs.at(threadPoolIdx) == nullptr) {
SPDLOG_ERROR("Exec. env not set for thread: {}!", threadPoolIdx);
SPDLOG_ERROR("Exec. env not set for thread: {}:{} (app: {})!",
funcStr,
threadPoolIdx,
msg.appid());
throw std::runtime_error("Thread execution environment not set!");
}

Expand Down
15 changes: 13 additions & 2 deletions src/wamr/filesystem.cpp
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
#include <conf/FaasmConfig.h>
#include <faabric/executor/ExecutorContext.h>
#include <faabric/util/logging.h>
#include <storage/FileDescriptor.h>
#include <wamr/WAMRWasmModule.h>
#include <wamr/native.h>
#include <wamr/types.h>

#include <cstring>
#include <stdexcept>
#include <string>
#include <sys/uio.h>

Expand Down Expand Up @@ -389,7 +389,18 @@ static int32_t wasi_fd_write(wasm_exec_env_t exec_env,
conf::FaasmConfig& conf = conf::getFaasmConfig();
bool isStd = fd <= 2;
if (isStd && conf.captureStdout == "on") {
module->captureStdout(ioVecBuffNative.data(), ioVecCountWasm);
try {
module->captureStdout(ioVecBuffNative.data(), ioVecCountWasm);
} catch (std::exception& e) {
auto msg = faabric::executor::ExecutorContext::get()->getMsg();
SPDLOG_ERROR("{}:{}:{} Failed to capture stdout",
msg.appid(),
msg.groupid(),
msg.groupidx());

// Re-throw in a WAMR-safe way
module->doThrowException(e);
}
}

return __WASI_ESUCCESS;
Expand Down
53 changes: 36 additions & 17 deletions src/wamr/openmp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,46 @@
#include <wamr/native.h>
#include <wasm_export.h>

#define CALL_OPENMP_CATCH_EXCETION_NO_RETURN(call) \
try { \
call; \
} catch (std::exception & e) { \
auto __module = wasm::getExecutingWAMRModule(); \
__module->doThrowException(e); \
}

namespace wasm {
static void __kmpc_barrier_wrapper(wasm_exec_env_t execEnv,
int32_t loc,
int32_t globalTid)
{
wasm::doOpenMPBarrier(loc, globalTid);
CALL_OPENMP_CATCH_EXCETION_NO_RETURN(wasm::doOpenMPBarrier(loc, globalTid));
}

static void __kmpc_critical_wrapper(wasm_exec_env_t execEnv,
int32_t loc,
int32_t globalTid,
int32_t crit)
{
wasm::doOpenMPCritical(loc, globalTid, crit);
CALL_OPENMP_CATCH_EXCETION_NO_RETURN(
wasm::doOpenMPCritical(loc, globalTid, crit));
}

static void __kmpc_end_critical_wrapper(wasm_exec_env_t execEnv,
int32_t loc,
int32_t globalTid,
int32_t crit)
{
wasm::doOpenMPEndCritical(loc, globalTid, crit);
CALL_OPENMP_CATCH_EXCETION_NO_RETURN(
wasm::doOpenMPEndCritical(loc, globalTid, crit));
}

static void __kmpc_end_master_wrapper(wasm_exec_env_t execEnv,
int32_t loc,
int32_t globalTid)
{
wasm::doOpenMPEndMaster(loc, globalTid);
CALL_OPENMP_CATCH_EXCETION_NO_RETURN(
wasm::doOpenMPEndMaster(loc, globalTid));
}

static void __kmpc_end_reduce_wrapper(wasm_exec_env_t execEnv,
Expand All @@ -41,7 +52,8 @@ static void __kmpc_end_reduce_wrapper(wasm_exec_env_t execEnv,
int32_t lck)
{
OMP_FUNC_ARGS("__kmpc_end_reduce {} {} {}", loc, gtid, lck);
wasm::doOpenMPEndReduceCritical(msg, true);
CALL_OPENMP_CATCH_EXCETION_NO_RETURN(
wasm::doOpenMPEndReduceCritical(msg, true));
}

static void __kmpc_end_reduce_nowait_wrapper(wasm_exec_env_t execEnv,
Expand All @@ -50,19 +62,21 @@ static void __kmpc_end_reduce_nowait_wrapper(wasm_exec_env_t execEnv,
int32_t lck)
{
OMP_FUNC_ARGS("__kmpc_end_reduce_nowait {} {} {}", loc, gtid, lck);
wasm::doOpenMPEndReduceCritical(msg, false);
CALL_OPENMP_CATCH_EXCETION_NO_RETURN(
wasm::doOpenMPEndReduceCritical(msg, false));
}

static void __kmpc_end_single_wrapper(wasm_exec_env_t execEnv,
int32_t loc,
int32_t globalTid)
{
wasm::doOpenMPEndSingle(loc, globalTid);
CALL_OPENMP_CATCH_EXCETION_NO_RETURN(
wasm::doOpenMPEndSingle(loc, globalTid));
}

static void __kmpc_flush_wrapper(wasm_exec_env_t execEnv, int32_t loc)
{
wasm::doOpenMPFlush(loc);
CALL_OPENMP_CATCH_EXCETION_NO_RETURN(wasm::doOpenMPFlush(loc));
}

static int32_t __kmpc_single_wrapper(wasm_exec_env_t execEnv,
Expand All @@ -76,7 +90,8 @@ static void __kmpc_for_static_fini_wrapper(wasm_exec_env_t execEnv,
int32_t loc,
int32_t gtid)
{
wasm::doOpenMPForStaticFini(loc, gtid);
CALL_OPENMP_CATCH_EXCETION_NO_RETURN(
wasm::doOpenMPForStaticFini(loc, gtid));
}

static void __kmpc_for_static_init_4_wrapper(wasm_exec_env_t execEnv,
Expand All @@ -90,8 +105,8 @@ static void __kmpc_for_static_init_4_wrapper(wasm_exec_env_t execEnv,
int32_t incr,
int32_t chunk)
{
wasm::doOpenMPForStaticInit4(
loc, gtid, schedule, lastIter, lower, upper, stride, incr, chunk);
CALL_OPENMP_CATCH_EXCETION_NO_RETURN(wasm::doOpenMPForStaticInit4(
loc, gtid, schedule, lastIter, lower, upper, stride, incr, chunk));
}

static void __kmpc_for_static_init_8_wrapper(wasm_exec_env_t execEnv,
Expand All @@ -105,8 +120,8 @@ static void __kmpc_for_static_init_8_wrapper(wasm_exec_env_t execEnv,
int64_t incr,
int64_t chunk)
{
wasm::doOpenMPForStaticInit8(
loc, gtid, schedule, lastIter, lower, upper, stride, incr, chunk);
CALL_OPENMP_CATCH_EXCETION_NO_RETURN(wasm::doOpenMPForStaticInit8(
loc, gtid, schedule, lastIter, lower, upper, stride, incr, chunk));
}

static void __kmpc_fork_call_wrapper(wasm_exec_env_t execEnv,
Expand All @@ -125,6 +140,8 @@ static void __kmpc_fork_call_wrapper(wasm_exec_env_t execEnv,
// Create child thread's execution environments
wamrModule->createThreadsExecEnv(execEnv);

// Fork is complex enough that we try/catch different exceptions inside
// to avoid missing relevant errors
wasm::doOpenMPFork(locPtr, nSharedVars, microTaskPtr, nativeSharedVarsPtr);

// Clean-up child execution enviroments
Expand Down Expand Up @@ -170,8 +187,9 @@ static int32_t __kmpc_reduce_wrapper(wasm_exec_env_t execEnv,
reduceFunc,
lockPtr);

wasm::doOpenMPStartReduceCritical(
msg, level, numReduceVars, reduceVarPtrs, reduceVarsSize);
CALL_OPENMP_CATCH_EXCETION_NO_RETURN(wasm::doOpenMPStartReduceCritical(
msg, level, numReduceVars, reduceVarPtrs, reduceVarsSize));

return 1;
}

Expand All @@ -193,8 +211,9 @@ static int32_t __kmpc_reduce_nowait_wrapper(wasm_exec_env_t execEnv,
reduceFunc,
lockPtr);

wasm::doOpenMPStartReduceCritical(
msg, level, numReduceVars, reduceVarPtrs, reduceVarsSize);
CALL_OPENMP_CATCH_EXCETION_NO_RETURN(wasm::doOpenMPStartReduceCritical(
msg, level, numReduceVars, reduceVarPtrs, reduceVarsSize));

return 1;
}

Expand Down
Loading

0 comments on commit 2f17f92

Please sign in to comment.