From 126849d53d52b4eeb1c94ad8cba7209c48d468c8 Mon Sep 17 00:00:00 2001 From: Greg Brandt Date: Wed, 10 Sep 2014 11:37:00 -0700 Subject: [PATCH] Adds -maxChannelLifeMillis CLI argument to BenchmarkDriver --- .../helix/ipc/benchmark/BenchmarkDriver.java | 28 +++++++++++++------ 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/helix-ipc/src/test/java/org/apache/helix/ipc/benchmark/BenchmarkDriver.java b/helix-ipc/src/test/java/org/apache/helix/ipc/benchmark/BenchmarkDriver.java index 2d682f7ba2..13d9b3f9f4 100644 --- a/helix-ipc/src/test/java/org/apache/helix/ipc/benchmark/BenchmarkDriver.java +++ b/helix-ipc/src/test/java/org/apache/helix/ipc/benchmark/BenchmarkDriver.java @@ -64,18 +64,24 @@ public class BenchmarkDriver implements Runnable { private final AtomicBoolean isShutdown; private final byte[] messageBytes; private final int numConnections; + private final long maxChannelLifeMillis; private HelixIPCService ipcService; private String localhost; private Thread[] trafficThreads; - public BenchmarkDriver(int port, int numPartitions, int numThreads, int messageSize, - int numConnections) { + public BenchmarkDriver(int port, + int numPartitions, + int numThreads, + int messageSize, + int numConnections, + long maxChannelLifeMillis) { this.port = port; this.numPartitions = numPartitions; this.isShutdown = new AtomicBoolean(true); this.trafficThreads = new Thread[numThreads]; this.numConnections = numConnections; + this.maxChannelLifeMillis = maxChannelLifeMillis; StringBuilder sb = new StringBuilder(); for (int i = 0; i < messageSize; i++) { @@ -105,8 +111,10 @@ public void stopTraffic() { localhost = InetAddress.getLocalHost().getCanonicalHostName(); ipcService = new NettyHelixIPCService(new NettyHelixIPCService.Config() - .setInstanceName(localhost + "_" + port).setPort(port) - .setNumConnections(numConnections)); + .setInstanceName(localhost + "_" + port) + .setPort(port) + .setNumConnections(numConnections) + .setMaxChannelLifeMillis(maxChannelLifeMillis)); // Counts number of messages received, and ack them ipcService.registerCallback(MESSAGE_TYPE, new HelixIPCCallback() { @@ -193,6 +201,7 @@ public static void main(String[] args) throws Exception { options.addOption("threads", true, "Number of threads"); options.addOption("messageSize", true, "Message size in bytes"); options.addOption("numConnections", true, "Number of connections between nodes"); + options.addOption("maxChannelLifeMillis", true, "Maximum length of time to keep Netty Channel open"); CommandLine commandLine = new GnuParser().parse(options, args); @@ -211,10 +220,13 @@ public void run() { } }); - new BenchmarkDriver(Integer.parseInt(commandLine.getArgs()[0]), Integer.parseInt(commandLine - .getOptionValue("partitions", "1")), Integer.parseInt(commandLine.getOptionValue("threads", - "1")), Integer.parseInt(commandLine.getOptionValue("messageSize", "1024")), - Integer.parseInt(commandLine.getOptionValue("numConnections", "1"))).run(); + new BenchmarkDriver( + Integer.parseInt(commandLine.getArgs()[0]), + Integer.parseInt(commandLine.getOptionValue("partitions", "1")), + Integer.parseInt(commandLine.getOptionValue("threads", "1")), + Integer.parseInt(commandLine.getOptionValue("messageSize", "1024")), + Integer.parseInt(commandLine.getOptionValue("numConnections", "1")), + Long.parseLong(commandLine.getOptionValue("maxChannelLifeMillis", "5000"))).run(); latch.await(); }