Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow microbench runner to execute threads #601

Merged
merged 4 commits into from
Mar 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions include/runner/MicrobenchRunner.h
Original file line number Diff line number Diff line change
@@ -1,9 +1,23 @@
#include <faabric/proto/faabric.pb.h>

#include <memory>
#include <string>

namespace runner {
class MicrobenchRunner
{
public:
static int execute(const std::string& inFile, const std::string& outFile);

static int doRun(std::ofstream& outFs,
const std::string& user,
const std::string& function,
int nRuns,
const std::string& inputData);

static std::shared_ptr<faabric::BatchExecuteRequest> createBatchRequest(
const std::string& user,
const std::string& function,
const std::string& inputData);
};
}
65 changes: 42 additions & 23 deletions src/runner/MicrobenchRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <wasm/WasmModule.h>
#include <wavm/WAVMWasmModule.h>

#include <faabric/proto/faabric.pb.h>
#include <faabric/runner/FaabricMain.h>
#include <faabric/scheduler/ExecutorContext.h>
#include <faabric/scheduler/ExecutorFactory.h>
Expand All @@ -26,16 +27,11 @@ using namespace faabric::util;

namespace runner {

int doRun(std::ofstream& outFs,
const std::string& user,
const std::string& function,
int nRuns,
const std::string& inputData)
std::shared_ptr<faabric::BatchExecuteRequest>
MicrobenchRunner::createBatchRequest(const std::string& user,
const std::string& function,
const std::string& inputData)
{
// Clear out redis
faabric::redis::Redis& redis = faabric::redis::Redis::getQueue();
redis.flushAll();

// Set up invocation message
std::shared_ptr<faabric::BatchExecuteRequest> req =
faabric::util::batchExecFactory(user, function, 1);
Expand All @@ -52,6 +48,25 @@ int doRun(std::ofstream& outFs,

msg.set_inputdata(inputData);

// Force local to avoid any scheduling logic
msg.set_topologyhint("FORCE_LOCAL");

return req;
}

int MicrobenchRunner::doRun(std::ofstream& outFs,
const std::string& user,
const std::string& function,
int nRuns,
const std::string& inputData)
{
// Clear out redis
faabric::redis::Redis& redis = faabric::redis::Redis::getQueue();
redis.flushAll();

auto req = createBatchRequest(user, function, inputData);
faabric::Message& msg = req->mutable_messages()->at(0);

// Check files have been uploaded
storage::FileLoader& loader = storage::getFileLoader();
std::vector<uint8_t> wasmBytes = loader.loadFunctionWasm(msg);
Expand All @@ -68,32 +83,28 @@ int doRun(std::ofstream& outFs,
}

// Create faaslet
faaslet::Faaslet f(msg);
faabric::scheduler::ExecutorContext::set(&f, req, 0);
faabric::scheduler::Scheduler& sch = faabric::scheduler::getScheduler();

// Preflight if necessary
if (PREFLIGHT_CALLS) {
f.executeTask(0, 0, req);
f.reset(msg);
auto preflightReq = createBatchRequest(user, function, inputData);
sch.callFunctions(preflightReq);
sch.getFunctionResult(preflightReq->messages().at(0).id(), 10000);
}

// Main loop
for (int r = 0; r < nRuns; r++) {
// Execute
TimePoint execStart = startTimer();
int returnValue = f.executeTask(0, 0, req);
sch.callFunctions(req);
faabric::Message res = sch.getFunctionResult(msg.id(), 10000);
long execNanos = getTimeDiffNanos(execStart);
float execMicros = float(execNanos) / 1000;

// Reset
TimePoint resetStart = startTimer();
f.reset(msg);
long resetNanos = getTimeDiffNanos(resetStart);
float resetMicros = float(resetNanos) / 1000;

// Write result line
int returnValue = res.returnvalue();
outFs << user << "," << function << "," << returnValue << ","
<< execMicros << "," << resetMicros << std::endl;
<< execMicros << std::endl;

if (returnValue != 0) {
SPDLOG_ERROR("{}/{} failed on run {} with value {}",
Expand All @@ -119,8 +130,7 @@ int MicrobenchRunner::execute(const std::string& inFile,
// Set up output file
std::ofstream outFs;
outFs.open(outFile);
outFs << "User,Function,Return value,Execution (us),Reset (us)"
<< std::endl;
outFs << "User,Function,Return value,Execution (us)" << std::endl;

std::fstream inFs;
inFs.open(inFile, std::ios::in);
Expand All @@ -130,6 +140,13 @@ int MicrobenchRunner::execute(const std::string& inFile,
return 1;
}

// Set up the runner
std::shared_ptr<faaslet::FaasletFactory> fac =
std::make_shared<faaslet::FaasletFactory>();
faabric::scheduler::setExecutorFactory(fac);
faabric::runner::FaabricMain m(fac);
m.startRunner();

std::string nextLine;
while (getline(inFs, nextLine)) {
// Skip empty line
Expand Down Expand Up @@ -168,6 +185,8 @@ int MicrobenchRunner::execute(const std::string& inFile,
outFs.close();
inFs.close();

m.shutdown();

return 0;
}
}
5 changes: 0 additions & 5 deletions src/runner/microbench_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,6 @@ int main(int argc, char* argv[])
conf.globalMessageTimeout = 60000;
faasmConf.chainedCallTimeout = 60000;

// Set executor factory
std::shared_ptr<faaslet::FaasletFactory> fac =
std::make_shared<faaslet::FaasletFactory>();
faabric::scheduler::setExecutorFactory(fac);

int returnValue = MicrobenchRunner::execute(inFile, outFile);

faabric::transport::closeGlobalMessageContext();
Expand Down
20 changes: 12 additions & 8 deletions tests/test/runner/test_microbench_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,13 @@ void checkLine(const std::string& line,
std::vector<std::string> lineParts;
boost::split(lineParts, line, [](char c) { return c == ','; });

REQUIRE(lineParts.size() == 5);
REQUIRE(lineParts.size() == 4);
REQUIRE(lineParts[0] == user);
REQUIRE(lineParts[1] == function);
REQUIRE(lineParts[2] == "0");

float runTime = std::stof(lineParts[3]);
float resetTime = std::stof(lineParts[4]);

REQUIRE(runTime > 0);
REQUIRE(resetTime > 0);
}

TEST_CASE_METHOD(MultiRuntimeFunctionExecTestFixture,
Expand All @@ -51,9 +48,13 @@ TEST_CASE_METHOD(MultiRuntimeFunctionExecTestFixture,
std::ofstream specFs;
specFs.open(specFile);

// Override CPU count for executing OpenMP function
faabric::util::getSystemConfig().overrideCpuCount = 30;

specFs << "demo,echo,4,blah" << std::endl;
specFs << "demo,hello,3" << std::endl;
specFs << "python,hello,3" << std::endl;
specFs << "omp,hellomp,2" << std::endl;
specFs.close();

std::string outFile = "/tmp/microbench_out.csv";
Expand All @@ -64,10 +65,9 @@ TEST_CASE_METHOD(MultiRuntimeFunctionExecTestFixture,
std::vector<std::string> lines;
boost::split(lines, result, [](char c) { return c == '\n'; });

REQUIRE(lines.size() == 12);
REQUIRE(lines.size() == 14);

REQUIRE(lines.at(0) ==
"User,Function,Return value,Execution (us),Reset (us)");
REQUIRE(lines.at(0) == "User,Function,Return value,Execution (us)");

for (int i = 1; i < 5; i++) {
checkLine(lines.at(i), "demo", "echo");
Expand All @@ -81,6 +81,10 @@ TEST_CASE_METHOD(MultiRuntimeFunctionExecTestFixture,
checkLine(lines.at(i), "python", "hello");
}

REQUIRE(lines.at(11).empty());
for (int i = 11; i < 13; i++) {
checkLine(lines.at(i), "omp", "hellomp");
}

REQUIRE(lines.at(13).empty());
}
}