Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: belaban/JGroups
base: b44d2c8
...
head fork: belaban/JGroups
compare: 5229483
  • 4 commits
  • 25 files changed
  • 0 commit comments
  • 1 contributor
Showing with 251 additions and 3,108 deletions.
  1. +0 −8 bin/dtest.sh
  2. +0 −64 conf/config.txt
  3. +1 −2  conf/jg-magic-map.xml
  4. +1 −2  conf/jg-protocol-ids.xml
  5. +144 −0 doc/manual/en/modules/advanced.xml
  6. +0 −77 src/org/jgroups/demos/dynamic/Arguments.java
  7. +0 −32 src/org/jgroups/demos/dynamic/Command.java
  8. +0 −184 src/org/jgroups/demos/dynamic/DTest.java
  9. +0 −44 src/org/jgroups/demos/dynamic/DTestHeader.java
  10. +0 −35 src/org/jgroups/demos/dynamic/commands/all.java
  11. +0 −58 src/org/jgroups/demos/dynamic/commands/bounce.java
  12. +0 −20 src/org/jgroups/demos/dynamic/commands/echo.java
  13. +38 −0 src/org/jgroups/util/Util.java
  14. +0 −103 tests/perf/org/jgroups/tests/perf/Configuration.java
  15. +0 −164 tests/perf/org/jgroups/tests/perf/Data.java
  16. +0 −419 tests/perf/org/jgroups/tests/perf/IPerf.java
  17. +67 −7 tests/perf/org/jgroups/tests/perf/MPerf.java
  18. +0 −84 tests/perf/org/jgroups/tests/perf/MemberInfo.java
  19. +0 −9 tests/perf/org/jgroups/tests/perf/Receiver.java
  20. +0 −959 tests/perf/org/jgroups/tests/perf/Test.java
  21. +0 −55 tests/perf/org/jgroups/tests/perf/Transport.java
  22. +0 −81 tests/perf/org/jgroups/tests/perf/transports/JGroupsClusterTransport.java
  23. +0 −154 tests/perf/org/jgroups/tests/perf/transports/JGroupsTransport.java
  24. +0 −306 tests/perf/org/jgroups/tests/perf/transports/TcpTransport.java
  25. +0 −241 tests/perf/org/jgroups/tests/perf/transports/UdpTransport.java
View
8 bin/dtest.sh
@@ -1,8 +0,0 @@
-
-# Author: Bela Ban
-
-#!/bin/bash
-
-JG=$HOME/JGroups
-
-jgroups.sh org.jgroups.demos.dynamic.DTest $*
View
64 conf/config.txt
@@ -1,64 +0,0 @@
-#
-# This file contains configuration for the JGroups performance tests (org.jgroups.tests.perf package)
-#
-
-# Class implementing the org.jgroups.tests.perf.Transport interface
-
-transport=org.jgroups.tests.perf.transports.JGroupsTransport
-#transport=org.jgroups.tests.perf.transports.JGroupsClusterTransport
-#transport=org.jgroups.tests.perf.transports.UdpTransport
-#transport=org.jgroups.tests.perf.transports.TcpTransport
-
-# Number of messages a sender multicasts
-num_msgs=10000
-
-# Message size in bytes.
-msg_size=1000
-
-
-# Expected number of group members.
-num_members=2
-
-# Number of senders in the group. Min 1, max num_members.
-num_senders=2
-
-# dump stats every n msgs
-log_interval=1000
-
-# number of ms to wait at the receiver to simulate delay caused by processing of the message
-processing_delay=0
-
-# Needs to either contain the full property string, or an URL pointing to a valid
-# location (needs to be changed)
-props=file:c:\\udp.xml
-
-
-# Dumps statistics about the transport
-dump_transport_stats=false
-
-# Register JGroups channel and protocols with MBeanServer, don't terminate at the end
-jmx=false
-
-
-#####################################################
-# These properties are only used by the UDP transport
-#####################################################
-bind_addr=localhost
-mcast_addr=228.1.2.3
-mcast_port=7500
-
-
-############################
-# only used by TCP Transport
-############################
-
-# List of hosts in the cluster. Since we don't specify ports, you cannot run multiple TcpTransports
-# on the same machine: each ember has to be run on a separate machine (this may be changed in a next version)
-cluster=127.0.0.1:7800,127.0.0.1:7801
-start_port=7800
-
-
-#################################################
-# JNDI name of topic (only used by JMS transport)
-#################################################
-topic=topic/testTopic
View
3  conf/jg-magic-map.xml
@@ -52,6 +52,5 @@
<class id="89" name="org.jgroups.protocols.COUNTER$CounterHeader"/>
<class id="90" name="org.jgroups.protocols.MERGE3$MergeHeader"/>
<class id="91" name="org.jgroups.protocols.RSVP$RsvpHeader"/>
- <class id="92" name="org.jgroups.demos.dynamic.DTestHeader"/>
- <class id="93" name="org.jgroups.tests.perf.MPerf$MPerfHeader"/>
+ <class id="92" name="org.jgroups.tests.perf.MPerf$MPerfHeader"/>
</magic-number-class-mapping>
View
3  conf/jg-protocol-ids.xml
@@ -56,8 +56,7 @@
<!-- IDs reserved for building blocks -->
<class id="200" name="org.jgroups.blocks.RequestCorrelator"/> <!-- ID should be the same as Global.BLOCKS_START_ID -->
<class id="201" name="org.jgroups.blocks.mux.MuxRequestCorrelator"/>
- <class id="202" name="org.jgroups.demos.dynamic.DTest"/>
- <class id="203" name="org.jgroups.tests.perf.MPerf"/>
+ <class id="202" name="org.jgroups.tests.perf.MPerf"/>
<!-- User-defined protocols should use IDs >= 1000 -->
View
144 doc/manual/en/modules/advanced.xml
@@ -2135,6 +2135,150 @@ msg.setFlag(Message.NO_FC);
</section>
+ <section id="PerformanceTests">
+ <title>Performance tests</title>
+ <para>
+ There are a number of performance tests shipped with JGroups. The section below discusses MPerf, which
+ is a replacement for (static) perf.Test. This change was done in 3.1.
+ </para>
+ <section id="MPerf">
+ <title>MPerf</title>
+ <para>
+ MPerf is a test which measures multicast performance. This doesn't mean <emphasis>IP multicast</emphasis>
+ performance, but <emphasis>point-to-multipoint</emphasis> performance. Point-to-multipoint means that
+ we measure performance of one-to-many messages; in other words, messages sent to all cluster members.
+ </para>
+ <para>
+ Compared to the old perf.Test, MPerf is dynamic; it doesn't need a setup file to define the
+ number of senders, number of messages to be sent and message size. Instead, all the configuration
+ needed by an instance of MPerf is an XML stack configuration, and configuration changes done in one
+ member are automatically broadcast to all other members.
+ </para>
+ <para>
+ MPerf can be started as follows:
+ </para>
+ <screen>
+java -cp $CLASSPATH -Djava.net.preferIPv4Stack=true org.jgroups.tests.perf.MPerf -props ./fast.xml
+ </screen>
+ <para>
+ This assumes that we're using IPv4 addresses (otherwise IPv6 addresses are used) and the JGroups JAR on
+ CLASSPATH.
+ </para>
+ <para>
+ A screen shot of MPerf looks like this (could be different, depending on the JGroups version):
+ </para>
+ <screen>
+[linux]/home/bela$ mperf.sh -props ./fast.xml -name B
+
+----------------------- MPerf -----------------------
+Date: Mon Dec 12 15:33:21 CET 2011
+Run by: bela
+JGroups version: 3.1.0.Alpha1
+
+-------------------------------------------------------------------
+GMS: address=B, cluster=mperf, physical address=192.168.1.5:46614
+-------------------------------------------------------------------
+** [A|9] [A, B]
+num_msgs=1000000
+msg_size=1000
+num_threads=1
+[1] Send [2] View
+[3] Set num msgs (1000000) [4] Set msg size (1KB) [5] Set threads (1)
+[6] New config (./fast.xml)
+[x] Exit this [X] Exit all
+ </screen>
+ <para>
+ We're starting MPerf with <code>-props ./fast.xml</code> and <code>-name B</code>. The -props option
+ points to a JGroups configuration file, and -name gives the member the name "B".
+ </para>
+ <para>
+ MPerf can then be run by pressing [1]. In this case, every member in the cluster (in the example, we
+ have members A and B) will send 1 million 1K messages. Once all messages have been received, MPerf will
+ write a summary of the performance results to stdout:
+ </para>
+ <screen>
+[1] Send [2] View
+[3] Set num msgs (1000000) [4] Set msg size (1KB) [5] Set threads (1)
+[6] New config (./fast.xml)
+[x] Exit this [X] Exit all
+1
+-- sending 1000000 msgs
+++ sent 100000
+-- received 200000 msgs (1410 ms, 141843.97 msgs/sec, 141.84MB/sec)
+++ sent 200000
+-- received 400000 msgs (1326 ms, 150829.56 msgs/sec, 150.83MB/sec)
+++ sent 300000
+-- received 600000 msgs (1383 ms, 144613.16 msgs/sec, 144.61MB/sec)
+++ sent 400000
+-- received 800000 msgs (1405 ms, 142348.75 msgs/sec, 142.35MB/sec)
+++ sent 500000
+-- received 1000000 msgs (1343 ms, 148920.33 msgs/sec, 148.92MB/sec)
+++ sent 600000
+-- received 1200000 msgs (1700 ms, 117647.06 msgs/sec, 117.65MB/sec)
+++ sent 700000
+-- received 1400000 msgs (1399 ms, 142959.26 msgs/sec, 142.96MB/sec)
+++ sent 800000
+-- received 1600000 msgs (1359 ms, 147167.03 msgs/sec, 147.17MB/sec)
+++ sent 900000
+-- received 1800000 msgs (1689 ms, 118413.26 msgs/sec, 118.41MB/sec)
+++ sent 1000000
+-- received 2000000 msgs (1519 ms, 131665.57 msgs/sec, 131.67MB/sec)
+
+Results:
+
+B: 2000000 msgs, 2GB received, msgs/sec=137608.37, throughput=137.61MB
+A: 2000000 msgs, 2GB received, msgs/sec=137959.58, throughput=137.96MB
+
+===============================================================================
+ Avg/node: 2000000 msgs, 2GB received, msgs/sec=137788.49, throughput=137.79MB
+ Avg/cluster: 4000000 msgs, 4GB received, msgs/sec=275576.99, throughput=275.58MB
+================================================================================
+
+
+[1] Send [2] View
+[3] Set num msgs (1000000) [4] Set msg size (1KB) [5] Set threads (1) [6] New config (./fast.xml)
+[x] Exit this [X] Exit all
+ </screen>
+ <para>
+ In the sample run above, we see member B's screen. B sends 1 million messages and waits for its
+ 1 million and the 1 million messages from B to be received before it dumps some stats to stdout. The
+ stats include the number of messages and bytes received, the time, the message rate and throughput
+ averaged over the 2 members. It also shows the aggregated performance over the entire cluster.
+ </para>
+ <para>
+ In the sample run above, we got an average 137MB of data per member per second, and an aggregated
+ 275MB per second for the entire cluster (A and B in this case).
+ </para>
+ <para>
+ Parameters such as the number of messages to be sent, the message size and the number of threads to be
+ used to send the messages can be configured by pressing the corresponding numbers. After pressing return,
+ the change will be broadcast to all cluster members, so that we don't have to go to each member and
+ apply the same change. Also, new members started, will fetch the current configuration and apply it.
+ </para>
+ <para>
+ For example, if we set the message size in A to 2000 bytes, then the change would be sent to B, which
+ would apply it as well. If we started a third member C, it would also have a configuration with a
+ message size of 2000.
+ </para>
+ <para>
+ Another feature is the ability to restart all cluster members with a new configuration. For example, if
+ we modified <code>./fast.xml</code>, we could select [6] to make all cluster members disconnect and
+ close their existing channels and start a new channel based on the modified fast.xml configuration.
+ </para>
+ <para>
+ The new configuration file doesn't even have to be accessible on all cluster members; only on the
+ member which makes the change. The file contents will be read by that member, converted into a byte buffer
+ and shipped to all cluster members, where the new channel will then be created with the byte buffer
+ (converted into an input stream) as config.
+ </para>
+ <para>
+ Being able to dynamically change the test parameters and the JGroups configuration makes MPerf suited to
+ be run in larger clusters; unless a new JGroups version is installed, MPerf will never have to be
+ restarted manually.
+ </para>
+ </section>
+ </section>
+
<section id="Ergonomics">
<title>Ergonomics</title>
<para>
View
77 src/org/jgroups/demos/dynamic/Arguments.java
@@ -1,77 +0,0 @@
-package org.jgroups.demos.dynamic;
-
-import org.jgroups.util.Streamable;
-import org.jgroups.util.Util;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.util.List;
-
-/**
- * Encapsulates the arguments submitted against a {@link Command}.
- * @author Bela Ban
- * @since 3.1
- */
-public class Arguments implements Streamable {
-
- protected String command_name;
-
- /** A list of arguments that are passed to {@link Command#invoke(Object[])} */
- protected Object[] args;
-
-
-
- public Arguments() {
- }
-
-
- public Arguments(String command_name, Object ... args) {
- this.command_name=command_name;
- this.args=args;
- }
-
-
- public Arguments(String command_name, List<Object> args) {
- this.command_name=command_name;
- if(args == null || args.isEmpty())
- return;
- this.args=new Object[args.size()];
- for(int i=0; i < this.args.length; i++)
- this.args[i]=args.get(i);
- }
-
- public String getCommandName() {
- return command_name;
- }
-
- public Object[] getArguments() {
- return args;
- }
-
- public void writeTo(DataOutput out) throws Exception {
- Util.writeString(command_name, out);
- if(args == null || args.length == 0) {
- out.writeShort(0);
- return;
- }
- out.writeShort(args.length);
- for(Object arg: args)
- Util.writeObject(arg, out);
- }
-
- public void readFrom(DataInput in) throws Exception {
- command_name=Util.readString(in);
- short num_args=in.readShort();
- if(num_args == 0)
- return;
- args=new Object[num_args];
- for(int i=0; i < num_args; i++) {
- Object arg=Util.readObject(in);
- args[i]=arg;
- }
- }
-
- public String toString() {
- return command_name + " " + (args != null? args: "");
- }
-}
View
32 src/org/jgroups/demos/dynamic/Command.java
@@ -1,32 +0,0 @@
-package org.jgroups.demos.dynamic;
-
-
-/**
- * Abstract class for all commands
- * @author Bela Ban
- * @since 3.1
- */
-public abstract class Command {
- protected DTest test;
-
- public DTest getTest() {
- return test;
- }
-
- public void setTest(DTest test) {
- this.test=test;
- }
-
- /**
- * Invokes a command against this command
- *
- * @param args
- * @return
- * @throws Exception
- */
- abstract public Object invoke(Object[] args) throws Exception;
-
-
- /** Returns a description of the command and its arguments */
- abstract public String help();
-}
View
184 src/org/jgroups/demos/dynamic/DTest.java
@@ -1,184 +0,0 @@
-package org.jgroups.demos.dynamic;
-
-import org.jgroups.*;
-import org.jgroups.conf.ClassConfigurator;
-import org.jgroups.util.Util;
-
-import java.io.IOException;
-import java.util.*;
-
-/**
- * Dynamic test which can be started on several nodes. The test uses commands to dynamically invoke
- * functionality, such as a perf test, or reconfiguration of the channel.
- * @author Bela Ban
- * @since 3.1
- */
-public class DTest extends ReceiverAdapter {
- public static final String DEFAULT_COMMANDS_PACKAGE="org.jgroups.demos.dynamic.commands";
- public static final short ID;
-
- static {
- ID=ClassConfigurator.getProtocolId(DTest.class);
- }
-
- protected JChannel ch;
-
- /** A hashmap of the registered commands */
- protected final Map<String,Command> commands=new HashMap<String,Command>();
-
- public JChannel getChannel() {
- return ch;
- }
-
- public void setChannel(JChannel ch) {
- if(ch != null) {
- this.ch=ch;
- this.ch.setReceiver(this);
- }
- }
-
- public void start(String props) throws Exception {
- ch=new JChannel(props);
- for(Command cmd: commands.values())
- cmd.setTest(this);
- ch.setReceiver(this);
- ch.connect("dtest-cluster");
- }
-
- protected void loop() throws IOException {
- for(;;) {
- System.out.print("> ");
- String line=Util.readLine(System.in);
- List<String> list=Util.parseStringList(line, " \t\n\r\f");
-
- String command_name=!list.isEmpty()? list.remove(0) : null;
- if(command_name == null)
- continue;
-
- List<Object> tmp=new ArrayList<Object>(list.size());
- tmp.addAll(list);
-
- Arguments args=new Arguments(command_name, tmp);
-
- // Process built-in commands first
- if(args.getCommandName().startsWith("exit"))
- break;
- if(args.getCommandName().startsWith("help")) {
- help();
- continue;
- }
-
- Command command=commands.get(args.getCommandName());
- if(command == null) {
- System.err.println("Command " + args.getCommandName() + " not found");
- continue;
- }
-
- try {
- command.invoke(args.getArguments());
- }
- catch(Exception e) {
- e.printStackTrace();
- }
- }
- Util.close(ch);
- }
-
- public void receive(Message msg) {
- DTestHeader hdr=(DTestHeader)msg.getHeader(ID);
- switch(hdr.type) {
- case DTestHeader.REQ:
- Arguments args=(Arguments)msg.getObject();
- Command command=commands.get(args.getCommandName());
- if(command == null)
- System.err.println("Command " + args.getCommandName() + " not found");
- else {
- Object rsp;
- try {
- rsp=command.invoke(args.getArguments());
- }
- catch(Exception e) {
- e.printStackTrace();
- rsp=e;
- }
- Message response=new Message(msg.getSrc(), null, rsp);
- response.putHeader(ID, new DTestHeader(DTestHeader.RSP));
- try {
- ch.send(response);
- }
- catch(Exception e) {
- e.printStackTrace();
- }
- }
- break;
- case DTestHeader.RSP:
- Object rsp=msg.getObject();
- System.out.println(" response from " + msg.getSrc() + ": " + rsp);
- break;
- default:
- System.err.println("received an invalid header " + hdr);
- break;
- }
- }
-
- public void viewAccepted(View view) {
- System.out.println("-- view: " + view);
- System.out.print("> ");
- }
-
- public void help() {
- System.out.println("DTest [-props <props>] [-commands_package <package name>]");
- System.out.println("help exit");
- System.out.println("Commands:");
- for(Map.Entry<String,Command> entry: commands.entrySet())
- System.out.println(entry.getKey() + ": " + entry.getValue().help());
- }
-
- protected void findAndRegisterCommands(Collection<String> packages) throws Exception {
- for(String package_name: packages) {
- Set<Class<Command>> classes=Util.findClassesAssignableFrom(package_name,Command.class);
- for(Class<Command> clazz: classes) {
- String name=clazz.getSimpleName();
- Command command=clazz.newInstance();
- if(commands.containsKey(name))
- throw new IllegalStateException("command " + name + " already exists");
- commands.put(name, command);
- }
- }
- }
-
-
- public static void main(String[] args) throws Exception {
- String props=null;
- Collection<String> packages=new HashSet<String>();
- DTest test=new DTest();
- boolean print_help=false;
-
- packages.add(DEFAULT_COMMANDS_PACKAGE);
- for(int i=0; i < args.length; i++) {
- if(args[i].equals("-props")) {
- props=args[++i];
- continue;
- }
- if(args[i].equals("-commmands_package")) {
- String tmp_package=args[++i];
- packages.add(tmp_package);
- }
- print_help=true;
- break;
- }
-
- test.findAndRegisterCommands(packages);
- if(print_help) {
- test.help();
- return;
- }
-
- test.start(props);
- test.loop();
- }
-
-
-
-
-}
View
44 src/org/jgroups/demos/dynamic/DTestHeader.java
@@ -1,44 +0,0 @@
-package org.jgroups.demos.dynamic;
-
-import org.jgroups.Global;
-import org.jgroups.Header;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-
-/**
- * Marks a message as either a request or a response. The arguments to the request or return object of the response
- * are in the payload of the message itself.
- * @author Bela Ban
- * @since 3.1
- */
-public class DTestHeader extends Header {
- public static final byte REQ = 1;
- public static final byte RSP = 2;
-
- protected byte type;
-
-
- public DTestHeader() {
- }
-
- public DTestHeader(byte type) {
- this.type=type;
- }
-
- public int size() {
- return Global.BYTE_SIZE;
- }
-
- public void writeTo(DataOutput out) throws Exception {
- out.writeByte(type);
- }
-
- public void readFrom(DataInput in) throws Exception {
- type=in.readByte();
- }
-
- public String toString() {
- return type == REQ? "REQ" : (type == RSP? "RSP" : "unknown");
- }
-}
View
35 src/org/jgroups/demos/dynamic/commands/all.java
@@ -1,35 +0,0 @@
-package org.jgroups.demos.dynamic.commands;
-
-import org.jgroups.Message;
-import org.jgroups.demos.dynamic.Arguments;
-import org.jgroups.demos.dynamic.Command;
-import org.jgroups.demos.dynamic.DTest;
-import org.jgroups.demos.dynamic.DTestHeader;
-
-/**
- * Invokes a command across the entire cluster
- * @author Bela Ban
- * @since 3.1
- */
-public class all extends Command {
- public Object invoke(Object[] args) throws Exception {
- if(args == null || args.length < 1)
- throw new IllegalArgumentException("arguments are incorrect");
-
- String command_name=(String)args[0];
- Object[] tmp=args.length > 1? new Object[args.length -1] : null;
- if(args.length > 1)
- System.arraycopy(args, 1, tmp, 0, args.length - 1);
-
- Arguments arguments=new Arguments(command_name, tmp);
- Message msg=new Message(null, null, arguments);
- msg.setFlag(Message.Flag.RSVP);
- msg.putHeader(DTest.ID, new DTestHeader(DTestHeader.REQ));
- test.getChannel().send(msg);
- return null;
- }
-
- public String help() {
- return "command [arg]*";
- }
-}
View
58 src/org/jgroups/demos/dynamic/commands/bounce.java
@@ -1,58 +0,0 @@
-package org.jgroups.demos.dynamic.commands;
-
-import org.jgroups.JChannel;
-import org.jgroups.demos.dynamic.Command;
-import org.jgroups.util.Util;
-
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
-
-/**
- * Restarts the channel, with the same or a different configuration
- * @author Bela Ban
- * @since 3.1
- */
-public class bounce extends Command {
- public Object invoke(Object[] args) throws Exception {
- if(args.length < 1)
- throw new IllegalArgumentException("need at least 1 arg (configuration)");
- Object config=args[0];
- JChannel ch=null;
-
- if(config instanceof byte[]) {
- InputStream input=new ByteArrayInputStream((byte[])config);
- ch=new JChannel(input);
- }
- else if(config instanceof String) {
- ch=new JChannel((String)config);
- }
-
- if(ch == null)
- return "couldn't initialize channel";
-
- final String cluster_name=test.getChannel().getClusterName();
-
- final JChannel tmp=ch;
- Thread thread=new Thread() {
- public void run() {
- Util.sleepRandom(2000, 5000);
- Util.close(test.getChannel());
- try {
- tmp.connect(cluster_name);
- }
- catch(Exception e) {
- e.printStackTrace();
- }
- test.setChannel(tmp);
- System.out.println("-- created new channel: " + tmp.getAddress());
- }
- };
- thread.start();
-
- return "about to create a new channel (in a few seconds)";
- }
-
- public String help() {
- return "bounce [<config file> | <byte buffer>]";
- }
-}
View
20 src/org/jgroups/demos/dynamic/commands/echo.java
@@ -1,20 +0,0 @@
-package org.jgroups.demos.dynamic.commands;
-
-import org.jgroups.demos.dynamic.Command;
-
-/**
- * @author bela
- * @since x.y
- */
-public class echo extends Command {
- public Object invoke(Object[] args) throws Exception {
- StringBuilder sb=new StringBuilder("hello " + args[0] + " !");
- if(args.length >= 2)
- sb.append("Oh, I see you are from " + args[1]);
- return sb.toString();
- }
-
- public String help() {
- return "name [town]";
- }
-}
View
38 src/org/jgroups/util/Util.java
@@ -1110,6 +1110,44 @@ public static String readContents(InputStream input) {
return sb.toString();
}
+ public static byte[] readFileContents(String filename) throws IOException {
+ File file=new File(filename);
+ if(!file.exists())
+ throw new FileNotFoundException(filename);
+ long length=file.length();
+ byte contents[]=new byte[(int)length];
+ InputStream in=new BufferedInputStream(new FileInputStream(filename));
+ int bytes_read=0;
+
+ for(;;) {
+ int tmp=in.read(contents, bytes_read, (int)(length - bytes_read));
+ if(tmp == -1)
+ break;
+ bytes_read+=tmp;
+ if(bytes_read == length)
+ break;
+ }
+ return contents;
+ }
+
+ public static byte[] readFileContents(InputStream input) throws IOException {
+ byte contents[]=new byte[10000], buf[]=new byte[1024];
+ InputStream in=new BufferedInputStream(input);
+ int bytes_read=0;
+
+ for(;;) {
+ int tmp=in.read(buf, 0, buf.length);
+ if(tmp == -1)
+ break;
+ System.arraycopy(buf, 0, contents, bytes_read, tmp);
+ bytes_read+=tmp;
+ }
+
+ byte[] retval=new byte[bytes_read];
+ System.arraycopy(contents, 0, retval, 0, bytes_read);
+ return retval;
+ }
+
public static String parseString(DataInput in) {
StringBuilder sb=new StringBuilder();
View
103 tests/perf/org/jgroups/tests/perf/Configuration.java
@@ -1,103 +0,0 @@
-package org.jgroups.tests.perf;
-
-import java.net.InetAddress;
-
-/**
- * Captures all config for IPerf
- * @author Bela Ban
- */
-public class Configuration {
- private int size=10 * 1000 * 1000;
- private long time=0; // time to send
- private int chunk_size=1000; // send in chunks of 1000 bytes
- private boolean sender=false;
- private String transport="org.jgroups.tests.perf.transports.JGroupsTransport";
- private String[] transport_args;
- private boolean jmx=false;
- private InetAddress bind_addr=null;
-
- public boolean isSender() {
- return sender;
- }
-
- public void setSender(boolean sender) {
- this.sender=sender;
- }
-
- public int getSize() {
- return size;
- }
-
- public void setSize(int size) {
- this.size=size;
- this.time=0;
- }
-
- public int getChunkSize() {
- return chunk_size;
- }
-
- public void setChunkSize(int chunk_size) {
- this.chunk_size=chunk_size;
- }
-
- public long getTime() {
- return time;
- }
-
- public void setTime(long time) {
- this.time=time;
- this.size=0;
- }
-
- public String getTransport() {
- return transport;
- }
-
- public InetAddress getBindAddress() {
- return bind_addr;
- }
-
- public void setBindAddress(InetAddress bind_addr) {
- this.bind_addr=bind_addr;
- }
-
- public boolean isJmx() {
- return jmx;
- }
-
- public void setJmx(boolean jmx) {
- this.jmx=jmx;
- }
-
- public void setTransport(final String transport) {
- if(transport == null) return;
- if(transport.equalsIgnoreCase("udp"))
- this.transport="org.jgroups.tests.perf.transports.UdpTransport";
- else if(transport.equalsIgnoreCase("tcp"))
- this.transport="org.jgroups.tests.perf.transports.TcpTransport";
- else if(transport.equalsIgnoreCase("jgroups"))
- this.transport="org.jgroups.tests.perf.transports.JGroupsTransport";
- else if(transport.equalsIgnoreCase("jgroupscluster"))
- this.transport="org.jgroups.tests.perf.transports.JGroupsClusterTransport";
- else
- this.transport=transport;
- }
-
- public String[] getTransportArgs() {
- return transport_args;
- }
-
- public void setTransportArgs(String[] transport_args) {
- this.transport_args=transport_args;
- }
-
- public String toString() {
- StringBuilder sb=new StringBuilder();
- sb.append("size=" + size).append("\n");
- sb.append("sender=" + sender).append("\n");
- sb.append("transport=" + transport + "\n");
- sb.append("jmx=" + jmx + "\n");
- return sb.toString();
- }
-}
View
164 tests/perf/org/jgroups/tests/perf/Data.java
@@ -1,164 +0,0 @@
-package org.jgroups.tests.perf;
-
-import org.jgroups.util.Streamable;
-import org.jgroups.util.Util;
-
-import java.io.*;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Data sent around between members
- * @author Bela Ban Jan 22
- * @author 2004
- */
-public class Data implements Streamable {
- final static byte DISCOVERY_REQ = 1;
- final static byte DISCOVERY_RSP = 2;
- final static byte DATA = 3;
- final static byte RESULTS = 4; // sent when a receiver has received all messages
- final static byte FINAL_RESULTS = 5; // sent when a sender is done
- final static byte FINAL_RESULTS_OK = 6; // sent when we know the everyone has received FINAL_MSGS
- final static byte START = 7; // start sending messages
- final static byte WARMUP = 8; // warmup messages
-
- public Data() {
- ;
- }
-
- public Data(byte type) {
- this.type=type;
- }
-
- byte type=0;
- byte[] payload=null; // used with DATA
- boolean sender=false; // used with DISCOVERY_RSP
- long num_msgs=0; // used with DISCOVERY_RSP
- MemberInfo result=null; // used with RESULTS
- Map<Object,MemberInfo> results=null; // used with final results
-
- public int getType() {
- return type;
- }
-
- public void writeTo(DataOutput out) throws Exception {
- out.writeByte(type);
- if(payload != null) {
- out.writeBoolean(true);
- out.writeInt(payload.length);
- out.write(payload, 0, payload.length);
- }
- else
- out.writeBoolean(false);
- out.writeBoolean(sender);
- out.writeLong(num_msgs);
-
- Util.writeStreamable(result, out);
-
- if(results != null) {
- out.writeBoolean(true);
- out.writeInt(results.size());
- Object key;
- MemberInfo val;
- for(Map.Entry<Object,MemberInfo> entry: results.entrySet()) {
- key=entry.getKey();
- val=entry.getValue();
- try {
- Util.writeObject(key, out);
- }
- catch(Exception e) {
- throw new IOException("failed to write object " + key);
- }
- Util.writeStreamable(val, out);
- }
- }
- else
- out.writeBoolean(false);
- }
-
- public void readFrom(DataInput in) throws Exception {
- type=in.readByte();
- if(in.readBoolean()) {
- int length=in.readInt();
- payload=new byte[length];
- in.readFully(payload, 0, length);
- }
- sender=in.readBoolean();
- num_msgs=in.readLong();
-
- result=(MemberInfo)Util.readStreamable(MemberInfo.class, in);
-
- if(in.readBoolean()) {
- int length=in.readInt();
- results=new HashMap(length);
- Object key;
- MemberInfo val;
- for(int i=0; i < length; i++) {
- try {
- key=Util.readObject(in);
- }
- catch(Exception e) {
- IOException ex=new IOException("failed to read key");
- ex.initCause(e);
- throw ex;
- }
- val=(MemberInfo)Util.readStreamable(MemberInfo.class, in);
- results.put(key, val);
- }
- }
- }
-
- public void writeExternal(ObjectOutput out) throws IOException {
- out.writeByte(type);
- if(payload != null) {
- out.writeInt(payload.length);
- out.write(payload, 0, payload.length);
- }
- else {
- out.writeInt(0);
- }
- out.writeBoolean(sender);
- out.writeLong(num_msgs);
- if(results != null) {
- out.writeBoolean(true);
- out.writeObject(results);
- }
- else
- out.writeBoolean(false);
- }
-
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- type=in.readByte();
- int len=in.readInt();
- if(len > 0) {
- payload=new byte[len];
- in.readFully(payload, 0, payload.length);
- }
- sender=in.readBoolean();
- num_msgs=in.readLong();
- boolean results_available=in.readBoolean();
- if(results_available)
- results=(Map)in.readObject();
- }
-
-
-
-
- public String toString() {
- StringBuilder sb=new StringBuilder();
- sb.append('[');
- switch(type) {
- case DISCOVERY_REQ: sb.append("DISCOVERY_REQ"); break;
- case DISCOVERY_RSP: sb.append("DISCOVERY_RSP"); break;
- case DATA: sb.append("DATA"); break;
- case RESULTS: sb.append("RESULTS"); break;
- case FINAL_RESULTS: sb.append("FINAL_RESULTS"); break;
- case FINAL_RESULTS_OK: sb.append("FINAL_RESULTS_OK"); break;
- case START: sb.append("START"); break;
- case WARMUP: sb.append("WARMUP"); break;
- default: sb.append("<unknown>"); break;
- }
- sb.append("] ");
- return sb.toString();
- }
-}
View
419 tests/perf/org/jgroups/tests/perf/IPerf.java
@@ -1,419 +0,0 @@
-package org.jgroups.tests.perf;
-
-import org.jgroups.*;
-import org.jgroups.util.Util;
-import org.jgroups.util.Tuple;
-
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.Condition;
-import java.net.InetAddress;
-import java.text.NumberFormat;
-
-/**
- * Tests sending large messages from one sender to multiple receivers. The following messages are exchanged:
- * <pre>
- * | START (1 byte) |
- * | DATA (1 byte) | size (int, 4 bytes) | byte[] buf |
- * | STOP (1 byte) |
- * | RESULT (1 byte) | total time (long, 8 bytes) | total_bytes (long, 8 bytes)|
- * | REGISTER (1 byte) |
- *
- * </pre>
- * @author Bela Ban
- */
-public class IPerf implements Receiver {
- private final Configuration config;
- private final ConcurrentMap<Object,Entry> receiver_table=new ConcurrentHashMap<Object,Entry>();
- private Transport transport=null;
- private ResultSet results=null;
- private final Set<Object> members=new HashSet<Object>();
- private final static NumberFormat f;
-
-
- static {
- f=NumberFormat.getNumberInstance();
- f.setGroupingUsed(false);
- f.setMaximumFractionDigits(2);
- }
-
-
- public IPerf(Configuration config) {
- this.config=config;
- }
-
- public void start() throws Exception {
- transport=(Transport)Class.forName(config.getTransport()).newInstance();
- transport.create(config);
- transport.setReceiver(this);
- transport.start();
-
- members.add(transport.getLocalAddress());
- byte[] buf=createRegisterMessage();
- transport.send(null, buf, false);
-
-
- if(config.isSender()) {
- send();
- transport.stop();
- transport.destroy();
- }
- else {
- System.out.println("Transport " + transport.getClass().getName() + " started at " + new Date());
- System.out.println("Listening on " + transport.getLocalAddress());
- }
- }
-
-
- public void receive(Object sender, byte[] payload) {
- ByteBuffer buf=ByteBuffer.wrap(payload);
- byte b=buf.get();
- Type type=Type.getType(b);
- switch(type) {
- case START:
- receiver_table.remove(sender);
- receiver_table.putIfAbsent(sender, new Entry());
- break;
- case DATA:
- Entry entry=receiver_table.get(sender);
- if(entry == null) {
- err("entry for " + sender + " not found");
- return;
- }
- int length=buf.getInt();
- entry.total_bytes+=length;
- break;
- case STOP:
- entry=receiver_table.get(sender);
- if(entry == null) {
- err("entry for " + sender + " not found");
- return;
- }
- if(entry.stop_time == 0) {
- entry.stop_time=System.currentTimeMillis();
- }
- System.out.println("result for " + sender + ": " + entry);
- sendResult(sender, entry);
- break;
- case RESULT:
- long total_time=buf.getLong(), total_bytes=buf.getLong();
- results.add(sender, total_time, total_bytes);
- break;
- case REGISTER:
- members.add(sender);
- break;
- }
- }
-
-
-
-
- private void send() throws Exception {
- int size=config.getSize();
- long time=config.getTime();
- int chunk_size=config.getChunkSize();
-
- results=new ResultSet(members, size);
-
- byte[] buf=createStartMessage();
- transport.send(null, buf, false);
-
- if(time > 0) {
- ByteBuffer buffer=createDataBuffer(chunk_size);
- buffer.put(Type.DATA.getByte());
- buffer.putInt(chunk_size);
- buf=buffer.array();
-
- long end_time=System.currentTimeMillis() + time;
- while(System.currentTimeMillis() < end_time) {
- transport.send(null, buf, false);
- }
- }
- else {
- int sent=0;
- int to_send, remaining;
-
- while(sent < size) {
- remaining=size - sent;
- to_send=Math.min(remaining, chunk_size);
- buf=createDataMessage(to_send);
- transport.send(null, buf, false);
- sent+=to_send;
- }
- }
-
- buf=createStopMessage();
- transport.send(null, buf, false);
-
- boolean rc=results.block(30000L);
- if(rc)
- log("got all results");
- else
- err("didnt get all results");
- System.out.println("\nResults:\n" + results);
- results.reset();
- }
-
-
- private static byte[] createRegisterMessage() {
- return createMessage(Type.REGISTER);
- }
-
-
- private void sendResult(Object destination, Entry entry) {
- ByteBuffer buf=ByteBuffer.allocate(Global.BYTE_SIZE + Global.LONG_SIZE * 2);
- buf.put(Type.RESULT.getByte());
- buf.putLong(entry.stop_time - entry.start_time);
- buf.putLong(entry.total_bytes);
- try {
- transport.send(destination, buf.array(), false);
- }
- catch(Exception e) {
- err(e.toString());
- }
- }
-
-
- private static byte[] createStartMessage() {
- return createMessage(Type.START);
- }
-
-
- private static byte[] createDataMessage(int length) {
- ByteBuffer buf=ByteBuffer.allocate(Global.BYTE_SIZE + length + Global.INT_SIZE);
- buf.put(Type.DATA.getByte());
- buf.putInt(length);
- return buf.array();
- }
-
- private static ByteBuffer createDataBuffer(int length) {
- ByteBuffer buf=ByteBuffer.allocate(Global.BYTE_SIZE + length + Global.INT_SIZE);
- buf.put(Type.DATA.getByte());
- buf.putInt(length);
- return buf;
- }
-
-
- private static byte[] createStopMessage() {
- return createMessage(Type.STOP);
- }
-
-
- private static byte[] createMessage(Type type) {
- ByteBuffer buf=ByteBuffer.allocate(Global.BYTE_SIZE);
- buf.put(type.getByte());
- return buf.array();
- }
-
-
- public static void main(String[] args) throws Exception {
- Configuration config=new Configuration();
-
- List<String> unused_args=new ArrayList<String>(args.length);
-
- for(int i=0; i < args.length; i++) {
- String tmp=args[i];
- if(tmp.equalsIgnoreCase("-sender")) {
- config.setSender(true);
- continue;
- }
- if(tmp.equalsIgnoreCase("-size")) {
- config.setSize(Integer.parseInt(args[++i]));
- continue;
- }
- if(tmp.equals("-time")) {
- config.setTime(Long.parseLong(args[++i]));
- continue;
- }
- if(tmp.equals("-chunk_size")) {
- config.setChunkSize(Integer.parseInt(args[++i]));
- continue;
- }
- if(tmp.equals("-transport")) {
- config.setTransport(args[++i]);
- continue;
- }
- if(tmp.equals("-bind_addr")) {
- config.setBindAddress(InetAddress.getByName(args[++i]));
- continue;
- }
- if(tmp.equals("-h") || tmp.equals("-help")) {
- help(config.getTransport());
- return;
- }
- unused_args.add(tmp);
- }
-
- if(!unused_args.isEmpty()) {
- String[] tmp=new String[unused_args.size()];
- for(int i=0; i < unused_args.size(); i++)
- tmp[i]=unused_args.get(i);
- config.setTransportArgs(tmp);
- }
-
- new IPerf(config).start();
- }
-
- static void help(String transport) {
- StringBuilder sb=new StringBuilder();
- sb.append("IPerf [-sender] [-bind_addr <addr>] [-transport <class name>] [-size <bytes> | -time <ms>] [-chunk_size <bytes>]");
- try {
- Transport tp=(Transport)Class.forName(transport).newInstance();
- String tmp=tp.help();
- if(tmp != null && tmp.length() > 0)
- sb.append("\nTransport specific options for " + tp.getClass().getName() + ":\n" + tp.help());
- }
- catch(Exception e) {
- e.printStackTrace();
- }
- System.out.println(sb);
- }
-
- private static void log(String msg) {
- System.out.println(msg);
- }
-
- private static void err(String msg) {
- System.err.println(msg);
- }
-
-
-
- public static enum Type {
- START(1),
- DATA(2),
- STOP(3),
- RESULT(4),
- REGISTER(5);
-
- final byte b;
-
- Type(int i) {
- b=(byte)i;
- }
-
- public byte getByte() {
- return b;
- }
-
- public static Type getType(byte input) {
- switch(input) {
- case 1: return START;
- case 2: return DATA;
- case 3: return STOP;
- case 4: return RESULT;
- case 5: return REGISTER;
- }
- throw new IllegalArgumentException("type " + input + " is not valid");
- }
- }
-
- private static class Entry {
- private final long start_time;
- private long stop_time=0;
- private long total_bytes=0;
-
- public Entry() {
- this.start_time=System.currentTimeMillis();
- }
-
- public String toString() {
- double throughput=total_bytes / ((stop_time - start_time) / 1000.0);
- return stop_time - start_time + " ms for " + Util.printBytes(total_bytes) +
- " (" + Util.printBytes(throughput) + " / sec)";
- }
- }
-
- private static class ResultSet {
- private final Set<Object> not_heard_from;
- private final ConcurrentMap<Object, Tuple<Long,Long>> results=new ConcurrentHashMap<Object,Tuple<Long,Long>>();
- private final Lock lock=new ReentrantLock();
- private final Condition cond=lock.newCondition();
- private long expected_bytes=0;
-
-
- public ResultSet(Collection<Object> not_heard_from, long expected_bytes) {
- this.not_heard_from=new HashSet<Object>(not_heard_from); // make a copy
- this.expected_bytes=expected_bytes;
-
- }
-
- public boolean add(Object sender, long time, long total_bytes) {
- results.putIfAbsent(sender, new Tuple<Long,Long>(time, total_bytes));
- lock.lock();
- try {
- if(not_heard_from.remove(sender))
- cond.signalAll();
- return not_heard_from.isEmpty();
- }
- finally {
- lock.unlock();
- }
- }
-
- public boolean block(long timeout) {
- long target=System.currentTimeMillis() + timeout;
- long curr_time;
- lock.lock();
- try {
-
- while((curr_time=System.currentTimeMillis()) < target && !not_heard_from.isEmpty()) {
- long wait_time=target - curr_time;
- try {
- cond.await(wait_time, TimeUnit.MILLISECONDS);
- }
- catch(InterruptedException e) {
- ;
- }
- }
- return not_heard_from.isEmpty();
- }
- finally {
- lock.unlock();
- }
- }
-
- public int size() {
- return results.size();
- }
-
- public void reset() {
- lock.lock();
- try {
- not_heard_from.clear();
- results.clear();
- cond.signalAll();
- }
- finally {
- lock.unlock();;
- }
- }
-
- public String toString() {
- StringBuilder sb=new StringBuilder();
- for(Map.Entry<Object,Tuple<Long,Long>> entry: results.entrySet()) {
- Tuple<Long, Long> val=entry.getValue();
- sb.append(entry.getKey()).append(" time=" + val.getVal1() + " ms for " + Util.printBytes(val.getVal2()));
- if(expected_bytes > 0) {
- long total_received_bytes=val.getVal2();
- long missing=expected_bytes - total_received_bytes;
- double loss_rate=100.0 / expected_bytes * missing;
- double throughput=val.getVal2() / (val.getVal1() / 1000.0);
- sb.append(" (" + Util.printBytes(throughput) + " / sec, loss rate=" + f.format(loss_rate) + "%)");
- }
- sb.append("\n");
- }
- if(!not_heard_from.isEmpty())
- sb.append("(not heard from " + not_heard_from + ")\n");
- return sb.toString();
- }
- }
-
-
-
-}
View
74 tests/perf/org/jgroups/tests/perf/MPerf.java
@@ -9,8 +9,7 @@
import org.jgroups.util.Streamable;
import org.jgroups.util.Util;
-import java.io.DataInput;
-import java.io.DataOutput;
+import java.io.*;
import java.lang.reflect.Field;
import java.text.NumberFormat;
import java.util.*;
@@ -31,6 +30,7 @@
protected String props=null;
protected JChannel channel;
protected Address local_addr=null;
+ protected String name;
protected int num_msgs=1000 * 1000;
protected int msg_size=1000;
@@ -66,6 +66,7 @@
public void start(String props, String name) throws Exception {
this.props=props;
+ this.name=name;
StringBuilder sb=new StringBuilder();
sb.append("\n\n----------------------- MPerf -----------------------\n");
sb.append("Date: ").append(new Date()).append('\n');
@@ -90,12 +91,12 @@ protected void loop() {
int c;
final String INPUT="[1] Send [2] View\n" +
- "[3] Set num msgs (%d) [4] Set msg size (%s) [5] Set threads (%d)\n" +
+ "[3] Set num msgs (%d) [4] Set msg size (%s) [5] Set threads (%d) [6] New config (%s)\n" +
"[x] Exit this [X] Exit all ";
while(looping) {
try {
- c=Util.keyPress(String.format(INPUT, num_msgs, Util.printBytes(msg_size), num_threads));
+ c=Util.keyPress(String.format(INPUT, num_msgs, Util.printBytes(msg_size), num_threads, props));
switch(c) {
case '1':
results.reset(members);
@@ -117,6 +118,9 @@ protected void loop() {
case '5':
configChange("num_threads");
break;
+ case '6':
+ newConfig();
+ break;
case 'x':
looping=false;
break;
@@ -163,7 +167,16 @@ protected void configChange(String name) throws Exception {
if(tmp < 1)
throw new IllegalArgumentException("illegal value");
ConfigChange change=new ConfigChange(name, tmp);
- send(null,change,MPerfHeader.CONFIG_CHANGE,true);
+ send(null, change, MPerfHeader.CONFIG_CHANGE, true);
+ }
+
+ protected void newConfig() throws Exception {
+ String filename=Util.readStringFromStdin("Config file: ");
+ InputStream input=findFile(filename);
+ byte[] contents=Util.readFileContents(input);
+ send(null, contents, MPerfHeader.NEW_CONFIG, false);
+ ConfigChange change=new ConfigChange("props", filename);
+ send(null, change, MPerfHeader.CONFIG_CHANGE, true);
}
@@ -177,7 +190,7 @@ protected void send(Address target, Object payload, byte header, boolean rsvp) t
}
- private static String printProperties() {
+ protected static String printProperties() {
StringBuilder sb=new StringBuilder();
Properties p=System.getProperties();
for(Iterator it=p.entrySet().iterator(); it.hasNext();) {
@@ -187,6 +200,24 @@ private static String printProperties() {
return sb.toString();
}
+ protected static InputStream findFile(String filename) {
+ try {return new FileInputStream(filename);} catch(FileNotFoundException e) {}
+
+ File file=new File(filename);
+ String name=file.getName();
+ try {return new FileInputStream(name);} catch(FileNotFoundException e) {}
+
+ try {
+ String home_dir=System.getProperty("user.home");
+ filename=home_dir + File.separator + name;
+ try {return new FileInputStream(filename);} catch(FileNotFoundException e) {}
+ }
+ catch(Throwable t) {
+ }
+
+ return Util.getResourceAsStream(name, MPerf.class);
+ }
+
public void stop() {
looping=false;
Util.close(channel);
@@ -277,6 +308,10 @@ public void receive(Message msg) {
stack.destroy();
break;
+ case MPerfHeader.NEW_CONFIG:
+ applyNewConfig(msg.getBuffer());
+ break;
+
default:
System.err.println("Header type " + hdr.type + " not recognized");
}
@@ -310,6 +345,30 @@ protected void handleData(Address src, int length) {
}
+ protected void applyNewConfig(byte[] buffer) {
+ final InputStream in=new ByteArrayInputStream(buffer);
+ Thread thread=new Thread() {
+ public void run() {
+ try {
+ JChannel ch=new JChannel(in);
+ Util.sleepRandom(1000, 5000);
+ Util.close(channel);
+ channel=ch;
+ channel.setName(name);
+ channel.setReceiver(MPerf.this);
+ channel.connect("mperf");
+ local_addr=channel.getAddress();
+ }
+ catch(Exception e) {
+ System.err.println("failed creating new channel");
+ }
+ }
+ };
+
+ System.out.println("<< restarting channel");
+ thread.start();
+ }
+
protected void handleConfigChange(ConfigChange config_change) {
String attr_name=config_change.attr_name;
try {
@@ -566,6 +625,7 @@ public String toString() {
protected static final byte CONFIG_REQ = 7;
protected static final byte CONFIG_RSP = 8;
protected static final byte EXIT = 9;
+ protected static final byte NEW_CONFIG = 10;
protected byte type;
@@ -596,7 +656,7 @@ public static void main(String[] args) {
final MPerf test=new MPerf();
try {
- test.start(props,name);
+ test.start(props, name);
// this kludge is needed in order to terminate the program gracefully when 'X' is pressed
// (otherwise System.in.read() would not terminate)
View
84 tests/perf/org/jgroups/tests/perf/MemberInfo.java
@@ -1,84 +0,0 @@
-package org.jgroups.tests.perf;
-
-import org.jgroups.util.Streamable;
-import org.jgroups.util.Util;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.text.NumberFormat;
-
-/**
- * @author Bela Ban
- */
-public class MemberInfo implements Streamable {
- public long start=0;
- public long stop=0;
- public long num_msgs_expected=0;
- public long num_msgs_received=0;
- boolean done=false;
- long total_bytes_received=0;
-
- static NumberFormat f;
-
- static {
- f=NumberFormat.getNumberInstance();
- f.setGroupingUsed(false);
- f.setMaximumFractionDigits(2);
- }
-
- public MemberInfo() {
- }
-
- public MemberInfo(long num_msgs_expected) {
- this.num_msgs_expected=num_msgs_expected;
- }
-
- public double getMessageSec() {
- long total_time=stop-start;
- return num_msgs_received / (total_time/1000.0);
- }
-
- public long getTime() {
- return stop-start;
- }
-
-
-
- public String toString() {
- StringBuilder sb=new StringBuilder();
- double msgs_sec, throughput=0;
- long total_time=stop-start;
- double loss_rate=0;
- long missing_msgs=num_msgs_expected - num_msgs_received;
- msgs_sec=num_msgs_received / (total_time/1000.0);
- throughput=total_bytes_received / (total_time / 1000.0);
- loss_rate=missing_msgs >= num_msgs_expected? 100.0 : (100.0 / num_msgs_expected) * missing_msgs;
- sb.append("num_msgs_expected=").append(num_msgs_expected).append(", num_msgs_received=");
- sb.append(num_msgs_received);
- sb.append(" (loss rate=").append(loss_rate).append("%)");
- sb.append(", received=").append(Util.printBytes(total_bytes_received));
- sb.append(", time=").append(f.format(total_time)).append("ms");
- sb.append(", msgs/sec=").append(f.format(msgs_sec));
- sb.append(", throughput=").append(Util.printBytes(throughput));
- return sb.toString();
- }
-
- public void writeTo(DataOutput out) throws Exception {
- out.writeLong(start);
- out.writeLong(stop);
- out.writeLong(num_msgs_expected);
- out.writeLong(num_msgs_received);
- out.writeBoolean(done);
- out.writeLong(total_bytes_received);
- }
-
- public void readFrom(DataInput in) throws Exception {
- start=in.readLong();
- stop=in.readLong();
- num_msgs_expected=in.readLong();
- num_msgs_received=in.readLong();
- done=in.readBoolean();
- total_bytes_received=in.readLong();
-
- }
-}
View
9 tests/perf/org/jgroups/tests/perf/Receiver.java
@@ -1,9 +0,0 @@
-package org.jgroups.tests.perf;
-
-/**
- * @author Bela Ban Jan 22
- * @author 2004
- */
-public interface Receiver {
- void receive(Object sender, byte[] payload);
-}
View
959 tests/perf/org/jgroups/tests/perf/Test.java
@@ -1,959 +0,0 @@
-package org.jgroups.tests.perf;
-
-import org.jgroups.Version;
-import org.jgroups.logging.Log;
-import org.jgroups.logging.LogFactory;
-import org.jgroups.util.Util;
-
-import java.io.BufferedReader;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.text.NumberFormat;
-import java.util.*;
-import java.util.concurrent.BrokenBarrierException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/** You start the test by running this class.
- * @author Bela Ban (belaban@yahoo.com)
-
- */
-public class Test implements Receiver {
- String props=null;
- Properties config;