Skip to content

Commit

Permalink
Add latency test to ProfilingApplication
Browse files Browse the repository at this point in the history
  • Loading branch information
kenhuuu committed Jun 21, 2024
1 parent dc6e715 commit 7ff2fdb
Showing 1 changed file with 104 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
import java.io.File;
import java.io.FileWriter;
import java.io.PrintWriter;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
Expand All @@ -48,7 +50,7 @@
*/
public class ProfilingApplication {

private static final Random random = new Random();
private static final Random random = new Random(0); // Same seed to ensure consistent test runs.
private static final String[] scripts = new String[]{
"g.V()",
"g.V(1).out('knows')",
Expand Down Expand Up @@ -87,7 +89,7 @@ public ProfilingApplication(final String executionName, final Cluster cluster, f
this.exercise = exercise;
}

public long execute() throws Exception {
public long executeThroughput() throws Exception {
final AtomicInteger tooSlow = new AtomicInteger(0);

final Client client = cluster.connect();
Expand Down Expand Up @@ -130,16 +132,51 @@ public long execute() throws Exception {
}
}

public double executeLatency() throws Exception {
final Client client = cluster.connect();
final String executionId = "[" + executionName + "]";
try {
client.init();

final long start = System.nanoTime();
int size = 0;
final Iterator itr = client.submitAsync(script).get().iterator();
try {
while (true) {
itr.next();
size++;
}
} catch (NoSuchElementException nsee) {
; // Expected as hasNext() not called to increase performance.
}
final long end = System.nanoTime();
final long total = (end - start);


final double totalSeconds = total / 1000000000d;
System.out.println(String.format(StringUtils.rightPad(executionId, 10) + "time: %s, result count: %s", StringUtils.rightPad(String.valueOf(totalSeconds), 7), StringUtils.rightPad(String.valueOf(size), 10)));
return totalSeconds;
} catch (Exception ex) {
ex.printStackTrace();
throw new RuntimeException(ex);
} finally {
client.close();
}
}

private String chooseScript() {
return scripts[random.nextInt(scripts.length - 1)];
}

public enum TestType { LATENCY, THROUGHPUT };

public static void main(final String[] args) {
final Map<String,Object> options = ElementHelper.asMap(args);
final boolean noExit = Boolean.parseBoolean(options.getOrDefault("noExit", "false").toString());
final int parallelism = Integer.parseInt(options.getOrDefault("parallelism", "16").toString());
final BasicThreadFactory threadFactory = new BasicThreadFactory.Builder().namingPattern("profiler-%d").build();
final ExecutorService executor = Executors.newFixedThreadPool(parallelism, threadFactory);
final TestType testType = TestType.values()[(Integer.parseInt(options.getOrDefault("testType", "1").toString()) % TestType.values().length)];

final String host = options.getOrDefault("host", "localhost").toString();
final int minExpectedRps = Integer.parseInt(options.getOrDefault("minExpectedRps", "1000").toString());
Expand All @@ -155,7 +192,7 @@ public static void main(final String[] args) {
final int maxInProcessPerConnection = Integer.parseInt(options.getOrDefault("maxInProcessPerConnection", "64").toString());
final int minInProcessPerConnection = Integer.parseInt(options.getOrDefault("minInProcessPerConnection", "16").toString());
final int maxWaitForConnection = Integer.parseInt(options.getOrDefault("maxWaitForConnection", "3000").toString());
final int workerPoolSize = Integer.parseInt(options.getOrDefault("workerPoolSize", "2").toString());
final int workerPoolSize = Integer.parseInt(options.getOrDefault("workerPoolSize", Runtime.getRuntime().availableProcessors() * 2).toString());
final int tooSlowThreshold = Integer.parseInt(options.getOrDefault("tooSlowThreshold", "125").toString());
final String channelizer = options.getOrDefault("channelizer", Channelizer.WebSocketChannelizer.class.getName()).toString();
final String serializer = options.getOrDefault("serializer", Serializers.GRAPHBINARY_V1.name()).toString();
Expand All @@ -177,6 +214,12 @@ public static void main(final String[] args) {
.workerPoolSize(workerPoolSize).create();

try {
if (TestType.LATENCY == testType) {
System.out.println("-----------------------LATENCY TEST SELECTED----------------------");
} else {
System.out.println("---------------------THROUGHPUT TEST SELECTED---------------------");
}

if (exercise) {
System.out.println("--------------------------INITIALIZATION--------------------------");
final Client client = cluster.connect();
Expand All @@ -189,43 +232,72 @@ public static void main(final String[] args) {
System.out.println("Modern graph loaded");
}

final Object fileName = options.get("store");
final File f = null == fileName ? null : new File(fileName.toString());
if (f != null && f.length() == 0) {
try (final PrintWriter writer = new PrintWriter(new BufferedWriter(new FileWriter(f, true)))) {
writer.println("parallelism\tnioPoolSize\tminConnectionPoolSize\tmaxConnectionPoolSize\tminSimultaneousUsagePerConnection\tmaxSimultaneousUsagePerConnection\tminInProcessPerConnection\tmaxInProcessPerConnection\tworkerPoolSize\trequestPerSecond");
if (TestType.THROUGHPUT == testType) {
final Object fileName = options.get("store");
final File f = null == fileName ? null : new File(fileName.toString());
if (f != null && f.length() == 0) {
try (final PrintWriter writer = new PrintWriter(new BufferedWriter(new FileWriter(f, true)))) {
writer.println("parallelism\tnioPoolSize\tminConnectionPoolSize\tmaxConnectionPoolSize\tminSimultaneousUsagePerConnection\tmaxSimultaneousUsagePerConnection\tminInProcessPerConnection\tmaxInProcessPerConnection\tworkerPoolSize\trequestPerSecond");
}
}
}

// not much point to continuing with a line of tests if we can't get at least minExpectedRps.
final AtomicBoolean meetsRpsExpectation = new AtomicBoolean(true);
System.out.println("---------------------------WARMUP CYCLE---------------------------");
for (int ix = 0; ix < warmups && meetsRpsExpectation.get(); ix++) {
final long averageRequestsPerSecond = new ProfilingApplication("warmup-" + (ix + 1), cluster, 1000, executor, script, tooSlowThreshold, exercise).execute();
meetsRpsExpectation.set(averageRequestsPerSecond > minExpectedRps);
TimeUnit.SECONDS.sleep(1); // pause between executions
}
// not much point to continuing with a line of tests if we can't get at least minExpectedRps.
final AtomicBoolean meetsRpsExpectation = new AtomicBoolean(true);
System.out.println("---------------------------WARMUP CYCLE---------------------------");
for (int ix = 0; ix < warmups && meetsRpsExpectation.get(); ix++) {
final long averageRequestsPerSecond = new ProfilingApplication("warmup-" + (ix + 1), cluster, 1000, executor, script, tooSlowThreshold, exercise).executeThroughput();
meetsRpsExpectation.set(averageRequestsPerSecond > minExpectedRps);
TimeUnit.SECONDS.sleep(1); // pause between executions
}

final AtomicBoolean exceededTimeout = new AtomicBoolean(false);
long totalRequestsPerSecond = 0;
final AtomicBoolean exceededTimeout = new AtomicBoolean(false);
long totalRequestsPerSecond = 0;

// no need to execute this if we didn't pass the basic expectation in the warmups
if (exercise || meetsRpsExpectation.get()) {
final long start = System.nanoTime();
System.out.println("----------------------------TEST CYCLE----------------------------");
for (int ix = 0; ix < executions && !exceededTimeout.get(); ix++) {
totalRequestsPerSecond += new ProfilingApplication("test-" + (ix + 1), cluster, requests, executor, script, tooSlowThreshold, exercise).execute();
exceededTimeout.set((System.nanoTime() - start) > TimeUnit.NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS));
// no need to execute this if we didn't pass the basic expectation in the warmups
if (exercise || meetsRpsExpectation.get()) {
final long start = System.nanoTime();
System.out.println("----------------------------TEST CYCLE----------------------------");
for (int ix = 0; ix < executions && !exceededTimeout.get(); ix++) {
totalRequestsPerSecond += new ProfilingApplication("test-" + (ix + 1), cluster, requests, executor, script, tooSlowThreshold, exercise).executeThroughput();
exceededTimeout.set((System.nanoTime() - start) > TimeUnit.NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS));
TimeUnit.SECONDS.sleep(1); // pause between executions
}
}

final int averageRequestPerSecond = !meetsRpsExpectation.get() || exceededTimeout.get() ? 0 : Math.round(totalRequestsPerSecond / executions);
System.out.println(String.format("avg req/sec: %s", averageRequestPerSecond));
if (f != null) {
try (final PrintWriter writer = new PrintWriter(new BufferedWriter(new FileWriter(f, true)))) {
writer.println(String.join("\t", String.valueOf(parallelism), String.valueOf(nioPoolSize), String.valueOf(minConnectionPoolSize), String.valueOf(maxConnectionPoolSize), String.valueOf(minSimultaneousUsagePerConnection), String.valueOf(maxSimultaneousUsagePerConnection), String.valueOf(minInProcessPerConnection), String.valueOf(maxInProcessPerConnection), String.valueOf(workerPoolSize), String.valueOf(averageRequestPerSecond)));
}
}
} else if (TestType.LATENCY == testType) {
final AtomicBoolean meetsTimeoutExpectation = new AtomicBoolean(true);
System.out.println("---------------------------WARMUP CYCLE---------------------------");
for (int ix = 0; ix < warmups && meetsTimeoutExpectation.get(); ix++) {
final double latency = new ProfilingApplication("warmup-" + (ix + 1), cluster, 1000, executor, script, tooSlowThreshold, exercise).executeLatency();
meetsTimeoutExpectation.set(latency < timeout);
TimeUnit.SECONDS.sleep(1); // pause between executions
}
}

final int averageRequestPerSecond = !meetsRpsExpectation.get() || exceededTimeout.get() ? 0 : Math.round(totalRequestsPerSecond / executions);
System.out.println(String.format("avg req/sec: %s", averageRequestPerSecond));
if (f != null) {
try (final PrintWriter writer = new PrintWriter(new BufferedWriter(new FileWriter(f, true)))) {
writer.println(String.join("\t", String.valueOf(parallelism), String.valueOf(nioPoolSize), String.valueOf(minConnectionPoolSize), String.valueOf(maxConnectionPoolSize), String.valueOf(minSimultaneousUsagePerConnection), String.valueOf(maxSimultaneousUsagePerConnection), String.valueOf(minInProcessPerConnection), String.valueOf(maxInProcessPerConnection), String.valueOf(workerPoolSize), String.valueOf(averageRequestPerSecond)));
final AtomicBoolean exceededTimeout = new AtomicBoolean(false);
double totalTime = 0;

// no need to execute this if we didn't pass the basic expectation in the warmups
if (exercise || meetsTimeoutExpectation.get()) {
final long start = System.nanoTime();
System.out.println("----------------------------TEST CYCLE----------------------------");
for (int ix = 0; ix < executions && !exceededTimeout.get(); ix++) {
totalTime += new ProfilingApplication("test-" + (ix + 1), cluster, requests, executor, script, tooSlowThreshold, exercise).executeLatency();
exceededTimeout.set((System.nanoTime() - start) > TimeUnit.NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS));
TimeUnit.SECONDS.sleep(1); // pause between executions
}
}

final double averageLatency = !meetsTimeoutExpectation.get() || exceededTimeout.get() ? 0 : (totalTime / executions);
System.out.println(String.format("avg latency (sec/req): %s", averageLatency));
} else {
System.out.println("Encountered unknown testType. Please enter a valid value and try again.");
}

if (!noExit) System.exit(0);
Expand Down

0 comments on commit 7ff2fdb

Please sign in to comment.