Skip to content

Commit

Permalink
Clean up benchmark internal communication.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Dec 13, 2018
1 parent 453a4ce commit f7f47b4
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 24 deletions.
20 changes: 18 additions & 2 deletions bench/src/main/java/io/atomix/bench/BenchmarkConfig.java
Expand Up @@ -44,6 +44,22 @@ public class BenchmarkConfig {
private boolean includeEvents = DEFAULT_INCLUDE_EVENTS;
private boolean deterministic = DEFAULT_DETERMINISTIC;

public BenchmarkConfig() {
}

public BenchmarkConfig(BenchmarkConfig config) {
this.benchId = config.benchId;
this.protocol = config.protocol;
this.operations = config.operations;
this.writePercentage = config.writePercentage;
this.numKeys = config.numKeys;
this.keyLength = config.keyLength;
this.numValues = config.numValues;
this.valueLength = config.valueLength;
this.includeEvents = config.includeEvents;
this.deterministic = config.deterministic;
}

public String getBenchId() {
return benchId;
}
Expand All @@ -53,11 +69,11 @@ public BenchmarkConfig setBenchId(String benchId) {
return this;
}

public PrimitiveProtocolConfig getProtocol() {
public PrimitiveProtocolConfig<?> getProtocol() {
return protocol;
}

public BenchmarkConfig setProtocol(PrimitiveProtocolConfig protocol) {
public BenchmarkConfig setProtocol(PrimitiveProtocolConfig<?> protocol) {
this.protocol = protocol;
return this;
}
Expand Down
14 changes: 11 additions & 3 deletions bench/src/main/java/io/atomix/bench/BenchmarkController.java
Expand Up @@ -19,6 +19,8 @@
import io.atomix.cluster.MemberId;
import io.atomix.core.Atomix;
import io.atomix.utils.concurrent.Futures;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;
Expand All @@ -30,6 +32,8 @@
* Benchmark controller.
*/
public class BenchmarkController {
private static final Logger LOGGER = LoggerFactory.getLogger(BenchmarkController.class);

private final Atomix atomix;
private final BenchmarkConfig config;
private final Map<MemberId, BenchmarkProgress> reports = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -69,10 +73,10 @@ public BenchmarkProgress getProgress() {
}

int totalOperations = 0;
int totalTime = 0;
long totalTime = 0;
for (BenchmarkProgress progress : reports.values()) {
totalOperations += progress.getOperations();
totalTime += progress.getTime();
totalTime = Math.max(totalOperations, progress.getTime());
}
return new BenchmarkProgress(BenchmarkState.RUNNING, totalOperations, totalTime);
}
Expand All @@ -92,6 +96,8 @@ public BenchmarkResult getResult() {
* @return a future to be completed once the benchmark has been started
*/
public CompletableFuture<Void> start() {
LOGGER.info("Starting benchmark {}", getBenchId());

atomix.getCommunicationService().subscribe(
config.getBenchId(),
BenchmarkSerializer.INSTANCE::decode,
Expand All @@ -109,7 +115,7 @@ public CompletableFuture<Void> start() {
List<CompletableFuture<Void>> runFutures = benchMembers.stream()
.map(member -> atomix.getCommunicationService().<BenchmarkConfig, Void>send(
BenchmarkConstants.RUN_SUBJECT,
new BenchmarkConfig().setOperations(operationsPerMember),
new BenchmarkConfig(config).setOperations(operationsPerMember),
BenchmarkSerializer.INSTANCE::encode,
BenchmarkSerializer.INSTANCE::decode,
member))
Expand All @@ -123,6 +129,8 @@ public CompletableFuture<Void> start() {
* @return a future to be completed once the benchmark has been stopped
*/
public CompletableFuture<Void> stop() {
LOGGER.info("Stopping benchmark {}", config.getBenchId());

atomix.getCommunicationService().unsubscribe(config.getBenchId());
if (reports.isEmpty()) {
return CompletableFuture.completedFuture(null);
Expand Down
10 changes: 5 additions & 5 deletions bench/src/main/java/io/atomix/bench/BenchmarkResource.java
Expand Up @@ -73,9 +73,9 @@ public Response getProgress(
@Context ClusterMembershipService membershipService,
@Context ClusterCommunicationService communicationService) {
try {
BenchmarkProgress progress = communicationService.<Void, BenchmarkProgress>send(
BenchmarkProgress progress = communicationService.<String, BenchmarkProgress>send(
BenchmarkConstants.PROGRESS_SUBJECT,
null,
testId,
BenchmarkSerializer.INSTANCE::encode,
BenchmarkSerializer.INSTANCE::decode,
membershipService.getLocalMember().id())
Expand All @@ -95,9 +95,9 @@ public Response getResult(
@Context ClusterMembershipService membershipService,
@Context ClusterCommunicationService communicationService) {
try {
BenchmarkResult result = communicationService.<Void, BenchmarkResult>send(
BenchmarkResult result = communicationService.<String, BenchmarkResult>send(
BenchmarkConstants.RESULT_SUBJECT,
null,
testId,
BenchmarkSerializer.INSTANCE::encode,
BenchmarkSerializer.INSTANCE::decode,
membershipService.getLocalMember().id())
Expand All @@ -118,7 +118,7 @@ public Response stopTest(
try {
communicationService.send(
BenchmarkConstants.STOP_SUBJECT,
null,
testId,
membershipService.getLocalMember().id())
.get(10, TimeUnit.SECONDS);
return Response.ok().build();
Expand Down
28 changes: 20 additions & 8 deletions bench/src/main/java/io/atomix/bench/BenchmarkRunner.java
Expand Up @@ -17,7 +17,6 @@

import io.atomix.core.Atomix;
import io.atomix.core.map.AtomicMap;
import io.atomix.primitive.protocol.PrimitiveProtocol;
import io.atomix.primitive.protocol.ProxyProtocol;
import io.atomix.utils.serializer.Namespaces;
import io.atomix.utils.serializer.Serializer;
Expand All @@ -39,7 +38,7 @@
* Benchmark runner.
*/
public class BenchmarkRunner {
private final Logger log = LoggerFactory.getLogger(getClass());
private static final Logger LOGGER = LoggerFactory.getLogger(BenchmarkRunner.class);

private static final int REPORT_PERIOD = 1_000;

Expand All @@ -48,8 +47,10 @@ public class BenchmarkRunner {
private final Atomix atomix;
private final BenchmarkConfig config;

private final ExecutorService runnerExecutor = Executors.newSingleThreadExecutor(namedThreads("atomix-bench-runner", log));
private final ScheduledExecutorService reporterExecutor = Executors.newSingleThreadScheduledExecutor(namedThreads("atomix-bench-reporter", log));
private final ExecutorService runnerExecutor = Executors.newSingleThreadExecutor(
namedThreads("atomix-bench-runner", LOGGER));
private final ScheduledExecutorService reporterExecutor = Executors.newSingleThreadScheduledExecutor(
namedThreads("atomix-bench-reporter", LOGGER));
private long startTime;
private volatile boolean running;
private final AtomicInteger totalCounter = new AtomicInteger();
Expand All @@ -74,6 +75,17 @@ public String getBenchId() {
* Starts the benchmark runner.
*/
public void start() {
LOGGER.info("Running benchmark {}", getBenchId());

LOGGER.debug("operations: {}", config.getOperations());
LOGGER.debug("writePercentage: {}", config.getWritePercentage());
LOGGER.debug("numKeys: {}", config.getNumKeys());
LOGGER.debug("keyLength: {}", config.getKeyLength());
LOGGER.debug("numValues: {}", config.getNumValues());
LOGGER.debug("valueLength: {}", config.getValueLength());
LOGGER.debug("includeEvents: {}", config.isIncludeEvents());
LOGGER.debug("deterministic: {}", config.isDeterministic());

reporterExecutor.scheduleAtFixedRate(this::report, REPORT_PERIOD, REPORT_PERIOD, TimeUnit.MILLISECONDS);
running = true;
if (config.isDeterministic()) {
Expand Down Expand Up @@ -156,7 +168,7 @@ public void run() {
try {
submit();
} catch (Exception e) {
log.warn("Exception during cycle", e);
LOGGER.warn("Exception during benchmark cycle", e);
} finally {
totalCounter.incrementAndGet();
}
Expand All @@ -176,8 +188,8 @@ void write(String key, String value) {

@SuppressWarnings("unchecked")
void setup() {
ProxyProtocol protocol = (ProxyProtocol) atomix.getRegistry().getType(PrimitiveProtocol.Type.class, "foo").newProtocol(config.getProtocol());
map = atomix.<String, String>atomicMapBuilder("bench")
ProxyProtocol protocol = (ProxyProtocol) config.getProtocol().getType().newProtocol(config.getProtocol());
map = atomix.<String, String>atomicMapBuilder(config.getBenchId())
.withSerializer(Serializer.using(Namespaces.BASIC))
.withProtocol(protocol)
.build();
Expand All @@ -190,7 +202,7 @@ void setup() {
abstract void submit();

void teardown() {
//map.destroy();
map.delete();
}
}

Expand Down
Expand Up @@ -37,6 +37,7 @@ public final class BenchmarkSerializer {
.nextId(Namespaces.BEGIN_USER_CUSTOM_ID)
.register(BenchmarkConfig.class)
.register(BenchmarkProgress.class)
.register(BenchmarkState.class)
.register(BenchmarkResult.class)
.register(MultiRaftProtocolConfig.class)
.register(Murmur3Partitioner.class)
Expand Down
12 changes: 6 additions & 6 deletions dist/src/main/resources/bin/run_bench.py
Expand Up @@ -67,14 +67,14 @@ def run_bench(args):
print("Failed to start benchmark")
sys.exit(1)

while True:
try:
try:
while True:
if not report_progress(bench_id, url):
break
except KeyboardInterrupt:
stop_bench(bench_id)
sys.exit(1)
time.sleep(1)
time.sleep(1)
except KeyboardInterrupt:
stop_bench(bench_id, url)
sys.exit(1)

report_result(bench_id, url)

Expand Down

0 comments on commit f7f47b4

Please sign in to comment.