Skip to content

Commit

Permalink
Added MasterActorResponsiveness_BENCHMARK_Test.
Browse files Browse the repository at this point in the history
  • Loading branch information
rukletsov committed Nov 20, 2018
1 parent 45bd70f commit 40dc508
Showing 1 changed file with 232 additions and 1 deletion.
233 changes: 232 additions & 1 deletion src/tests/master_benchmarks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,27 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <atomic>
#include <limits>
#include <memory>
#include <string>
#include <tuple>
#include <vector>

#include <mesos/resources.hpp>
#include <mesos/version.hpp>

#include <process/async.hpp>
#include <process/clock.hpp>
#include <process/collect.hpp>
#include <process/future.hpp>
#include <process/loop.hpp>
#include <process/pid.hpp>
#include <process/process.hpp>
#include <process/protobuf.hpp>
#include <process/statistics.hpp>

#include <stout/duration.hpp>
#include <stout/stopwatch.hpp>

#include "common/protobuf_utils.hpp"
Expand All @@ -36,21 +43,32 @@

namespace http = process::http;

using process::async;
using process::await;
using process::collect;
using process::Break;
using process::Clock;
using process::Continue;
using process::ControlFlow;
using process::Failure;
using process::Future;
using process::loop;
using process::Owned;
using process::PID;
using process::ProcessBase;
using process::Promise;
using process::spawn;
using process::Statistics;
using process::terminate;
using process::UPID;
using process::wait;

using std::atomic_bool;
using std::cout;
using std::endl;
using std::make_tuple;
using std::numeric_limits;
using std::shared_ptr;
using std::string;
using std::tie;
using std::tuple;
Expand Down Expand Up @@ -237,7 +255,7 @@ class TestSlave
~TestSlave()
{
terminate(process.get());
process::wait(process.get());
wait(process.get());
}

Future<Nothing> reregister()
Expand Down Expand Up @@ -482,6 +500,219 @@ TEST_P(MasterStateQuery_BENCHMARK_Test, GetState)
}


class MasterActorResponsiveness_BENCHMARK_Test
: public MesosTest,
public WithParamInterface<tuple<
size_t, size_t, size_t, size_t, size_t, size_t, size_t>> {};


INSTANTIATE_TEST_CASE_P(
AgentFrameworkTaskCount,
MasterActorResponsiveness_BENCHMARK_Test,
::testing::Values(
make_tuple(100, 10, 10, 10, 10, 50, 5),
make_tuple(1000, 10, 10, 10, 10, 10, 5)));


// This test indirectly measures how the Master actor is affected by serving
// '/state' requests. The response time for a lightweight '/health' endpoint
// is taken as a load indicator. We set up a lot of master state from artificial
// agents and send multiple '/state' queries while constantly probing '/health'.
// As the baseline only '/health' is queried.
//
// NOTE: This test can dead lock if the number of libprocess worker threads is
// insufficient. We observed deadlocks when
// `(numClients >= LIBPROCESS_NUM_WORKER_THREADS - 3)`. Once MESOS-9400 is
// fixed, we can add an assertion here.
TEST_P(MasterActorResponsiveness_BENCHMARK_Test, WithV0StateLoad)
{
size_t agentCount;
size_t frameworksPerAgent;
size_t tasksPerFramework;
size_t completedFrameworksPerAgent;
size_t tasksPerCompletedFramework;
size_t numRequests;
size_t numClients;

tie(agentCount,
frameworksPerAgent,
tasksPerFramework,
completedFrameworksPerAgent,
tasksPerCompletedFramework,
numRequests,
numClients) = GetParam();

const string indicatorEndpoint = "health";
const string stateEndpoint = "state";

// Disable authentication to avoid the overhead, since we don't care about
// it in this test.
master::Flags masterFlags = CreateMasterFlags();
masterFlags.authenticate_agents = false;

Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
ASSERT_SOME(master);

vector<Owned<TestSlave>> slaves;

for (size_t i = 0; i < agentCount; i++) {
SlaveID slaveId;
slaveId.set_value("agent" + stringify(i));

slaves.push_back(Owned<TestSlave>(new TestSlave(
master.get()->pid,
slaveId,
frameworksPerAgent,
tasksPerFramework,
completedFrameworksPerAgent,
tasksPerCompletedFramework)));
}

cout << "Test setup: " << agentCount << " agents with a total of "
<< frameworksPerAgent * tasksPerFramework * agentCount
<< " running tasks and "
<< completedFrameworksPerAgent * tasksPerCompletedFramework * agentCount
<< " completed tasks" << endl;

vector<Future<Nothing>> reregistered;

foreach (const Owned<TestSlave>& slave, slaves) {
reregistered.push_back(slave->reregister());
}

// Wait all agents to finish reregistration.
await(reregistered).await();

Clock::pause();
Clock::settle();
Clock::resume();

// A helper sending a single request and measuring the time it takes to
// receive a response.
auto singleRequest = [master](const string& endpoint) -> Future<Duration> {
shared_ptr<Stopwatch> watch(new Stopwatch);
watch->start();

Future<http::Response> response = http::get(
master.get()->pid,
endpoint,
None(),
createBasicAuthHeaders(DEFAULT_CREDENTIAL));

return response.then([watch](const http::Response& r) -> Future<Duration> {
watch->stop();
EXPECT_EQ(r.status, http::OK().status);
return watch->elapsed();
});
};

// Synchronizes completion of all lambdas sending requests.
atomic_bool stop = { false };

// A helper sending `numRequests` requests to `endpoint`. An early exit
// is possible if `stop` is set. Note that this lambda sets `stop` once
// `numRequests` requests have been sent. The intention is to synchronize
// completion across all running lambdas.
auto repeatRequests = [singleRequest, &stop](
const string& endpoint, size_t numRequests) -> vector<Duration> {
vector<Duration> durations;

size_t remaining = numRequests;
auto f = loop(
None(),
[=]() {
return singleRequest(endpoint);
},
[&remaining, &durations, &stop](
const Duration& d) -> ControlFlow<Nothing> {
durations.push_back(d);

if (--remaining <= 0) {
stop.store(true);
}

if (stop.load()) {
return Break();
} else {
return Continue();
}
});

f.await();
EXPECT_TRUE(f.isReady());

return durations;
};

auto printStats = [](const vector<Duration>& durations) {
Option<Statistics<Duration>> s =
Statistics<Duration>::from(durations.cbegin(), durations.cend());
EXPECT_SOME(s);

cout << "[" << s->min << ", " << s->p25 << ", " << s->p50 << ", "
<< s->p75 << ", " << s->p90 << ", " << s->max << "]"
<< " from " << s->count << " measurements" << endl;
};

// First measure the average response time for the `indicatorEndpoint` only
// as the baseline.
cout << "Baseline: launching " << numRequests
<< " '/" << indicatorEndpoint<< "'" << " requests" << endl;

Future<vector<Duration>> indicatorFinished = async(
repeatRequests, indicatorEndpoint, numRequests);
indicatorFinished.await();
CHECK_READY(indicatorFinished);

cout << "Results [min, p25, p50, p75, p90, max]: " << endl
<< " '/" << indicatorEndpoint << "' -> ";
printStats(indicatorFinished.get());

Clock::pause();
Clock::settle();
Clock::resume();

// Now measure the average response times when request for both
// `indicatorEndpoint` and `stateEndpoint` are sent in parallel.
// Stop when `numRequests` to `stateEndpoint` have been sent.
stop.store(false);

cout << "Benchmark: launching "
<< numRequests << " '/" << indicatorEndpoint << "'"
<< " requests with up to " << numClients << " * " << numRequests
<< " '/" << stateEndpoint << "'" << " requests in background" << endl;

vector<Future<vector<Duration>>> stateFinished;
while (numClients-- > 0) {
stateFinished.push_back(async(
repeatRequests, stateEndpoint, numeric_limits<size_t>::max()));
}

indicatorFinished = async(
repeatRequests, indicatorEndpoint, numRequests);

Future<vector<vector<Duration>>> collected = collect(stateFinished);
collected.await();
CHECK_READY(collected);

indicatorFinished.await();
CHECK_READY(indicatorFinished);

// Aggregate response times for all `/state` clients.
vector<Duration> aggregatedState;
foreach (const vector<Duration>& v, collected.get()) {
aggregatedState.insert(aggregatedState.end(), v.cbegin(), v.cend());
}

cout << "Results [min, p25, p50, p75, p90, max]: " << endl
<< " '/" << indicatorEndpoint << "' -> ";
printStats(indicatorFinished.get());

cout << " '/" << stateEndpoint << "' -> ";
printStats(aggregatedState);
}


class MasterMetricsQuery_BENCHMARK_Test
: public MesosTest,
public WithParamInterface<tuple<
Expand Down

0 comments on commit 40dc508

Please sign in to comment.