Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

- Added ConcurrentLinkedBlockingQueue2

- Added TP.SenderSendsBundler
- Added dont_bundle option

- Added UnicastTestTcp (TCP based equivalent of UnicastTest)

- Got rid of inefficient Data classes i UnicastTest*

- Added UUID.getByName()

- Set TCP.tcp_nodelay=false by default

- Added PERF protocol
- Changed Average to do round-robin storing of samples rather than random

- Added UnicastTestSharedLoopback to test unicast msg perf between 2 members within the same process (no network comm)

- Added Pool and PoolTest

- Creating an output stream per bundler

- Setting use_send_queue to false by default

- Added ByteBuffer{Input,Output}Stream and test

- Added TCPConnectionMap.flush(dest)

- Reverted used_flush_queues to true (default)
  • Loading branch information...
commit 7747153d3f55f3f2722a186fdb08ffd92936f57c 1 parent b58fd4b
@belaban authored
Showing with 1,988 additions and 244 deletions.
  1. +1 −0  conf/jg-magic-map.xml
  2. +1 −1  conf/jg-messages.properties
  3. +1 −0  conf/jg-protocol-ids.xml
  4. +2 −2 conf/tcp.xml
  5. +27 −14 src/org/jgroups/blocks/TCPConnectionMap.java
  6. +2 −2 src/org/jgroups/protocols/BasicTCP.java
  7. +117 −0 src/org/jgroups/protocols/PERF.java
  8. +222 −37 src/org/jgroups/protocols/TP.java
  9. +18 −4 src/org/jgroups/util/Average.java
  10. +187 −0 src/org/jgroups/util/ByteBufferInputStream.java
  11. +143 −0 src/org/jgroups/util/ByteBufferOutputStream.java
  12. +182 −0 src/org/jgroups/util/ConcurrentLinkedBlockingQueue2.java
  13. +4 −3 src/org/jgroups/util/ExposedByteArrayOutputStream.java
  14. +4 −3 src/org/jgroups/util/ExposedDataOutputStream.java
  15. +105 −0 src/org/jgroups/util/Pool.java
  16. +4 −0 src/org/jgroups/util/UUID.java
  17. +40 −0 tests/junit-functional/org/jgroups/tests/ByteBufferOutputStreamTest.java
  18. +96 −0 tests/junit-functional/org/jgroups/tests/PoolTest.java
  19. +81 −154 tests/other/org/jgroups/tests/UnicastTest.java
  20. +44 −0 tests/other/org/jgroups/tests/UnicastTestSharedLoopback.java
  21. +327 −0 tests/other/org/jgroups/tests/UnicastTestTcp.java
  22. +366 −0 tests/other/org/jgroups/tests/UnicastTestTcpSlow.java
  23. +14 −24 tests/perf/org/jgroups/tests/perf/JPerf.java
View
1  conf/jg-magic-map.xml
@@ -68,4 +68,5 @@
<class id="106" name="org.jgroups.auth.X509Token"/>
<class id="107" name="org.jgroups.protocols.UNICAST3$Header"/>
<class id="108" name="org.jgroups.protocols.FORK$ForkHeader"/>
+ <class id="109" name="org.jgroups.protocols.PERF$PerfHeader"/>
</magic-number-class-mapping>
View
2  conf/jg-messages.properties
@@ -27,7 +27,7 @@ ParseFailure = JGRP000024: warning during parse
SyspropFailure = JGRP000025: failed getting system property for %s: %s
OnlyLoopbackFound = JGRP000026: unable to find an address other than loopback for IP version %s
PassUpFailure = JGRP000027: failed passing message up
-UnknownBundler = JGRP000028: bundler_type %s not known; using default bundler (new3)
+UnknownBundler = JGRP000028: bundler_type %s not known; using default bundler
SendFailure = JGRP000029: %s: failed sending message to %s (%d bytes): %s, headers: %s
IncomingMsgFailure = JGRP000030: %s: failed handling incoming message: %s
IncorrectDest = JGRP000031: %s: dropping unicast %s from %s to wrong destination %s, headers are: %s
View
1  conf/jg-protocol-ids.xml
@@ -59,6 +59,7 @@
<class id="63" name="org.jgroups.protocols.rules.SUPERVISOR"/>
<class id="64" name="org.jgroups.protocols.UNICAST3"/>
<class id="65" name="org.jgroups.protocols.FORK"/>
+ <class id="66" name="org.jgroups.protocols.PERF"/>
<!-- IDs reserved for building blocks -->
<class id="200" name="org.jgroups.blocks.RequestCorrelator"/> <!-- ID should be the same as Global.BLOCKS_START_ID -->
View
4 conf/tcp.xml
@@ -27,8 +27,8 @@
thread_pool.min_threads="1"
thread_pool.max_threads="10"
thread_pool.keep_alive_time="5000"
- thread_pool.queue_enabled="false"
- thread_pool.queue_max_size="100"
+ thread_pool.queue_enabled="true"
+ thread_pool.queue_max_size="10000"
thread_pool.rejection_policy="discard"
oob_thread_pool.enabled="true"
View
41 src/org/jgroups/blocks/TCPConnectionMap.java
@@ -18,7 +18,6 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
@@ -39,14 +38,14 @@
protected Log log=LogFactory.getLog(getClass());
protected int recv_buf_size=120000;
protected int send_buf_size=60000;
- protected int send_queue_size=0;
+ protected int send_queue_size=2000;
protected int sock_conn_timeout=1000; // max time in millis to wait for Socket.connect() to return
protected int peer_addr_read_timeout=2000; // max time in milliseconds to block on reading peer address
protected boolean tcp_nodelay=false;
protected int linger=-1;
protected final Thread acceptor;
protected final AtomicBoolean running=new AtomicBoolean(false);
- protected volatile boolean use_send_queues=false;
+ protected volatile boolean use_send_queues=true;
protected SocketFactory socket_factory=new DefaultSocketFactory();
@@ -110,7 +109,7 @@ else if(bind_addr != null)
acceptor=f.newThread(new Acceptor(),"ConnectionMap.Acceptor [" + local_addr + "]");
}
-
+
public Address getLocalAddress() {return local_addr;}
public Receiver getReceiver() {return recvr;}
public void setReceiver(Receiver receiver) {this.recvr=receiver;}
@@ -208,6 +207,13 @@ public void send(Address dest, byte[] data, int offset, int length) throws Excep
}
}
+ /** Flushes the TCPConnection associated with destination */
+ public void flush(Address destination) throws Exception {
+ TCPConnection conn=mapper.getConnection(destination);
+ if(conn != null)
+ conn.flush();
+ }
+
public void start() throws Exception {
if(running.compareAndSet(false, true)) {
acceptor.start();
@@ -349,7 +355,7 @@ protected static String explanation(boolean connection_existed, boolean replace)
public class TCPConnection implements Connection {
protected final Socket sock; // socket to/from peer (result of srv_sock.accept() or new Socket())
- protected final Lock send_lock=new ReentrantLock(); // serialize send()
+ protected final ReentrantLock send_lock=new ReentrantLock(); // serialize send()
protected final byte[] cookie= { 'b', 'e', 'l', 'a' };
protected DataOutputStream out;
protected DataInputStream in;
@@ -373,10 +379,10 @@ public TCPConnection(Socket s) throws Exception {
setSocketParameters(s);
this.out=new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
this.in=new DataInputStream(new BufferedInputStream(s.getInputStream()));
- this.peer_addr=readPeerAddress(s);
+ this.peer_addr=readPeerAddress(s);
this.sock=s;
- }
-
+ }
+
protected Address getPeerAddress() {
return peer_addr;
}
@@ -446,7 +452,7 @@ protected void send(byte[] data, int offset, int length) throws Exception {
sender.addToQueue(tmp);
}
else
- _send(data, offset, length, true);
+ _send(data, offset, length, true, true);
}
/**
@@ -458,11 +464,11 @@ protected void send(byte[] data, int offset, int length) throws Exception {
* @param acquire_lock
* @throws Exception
*/
- protected void _send(byte[] data, int offset, int length, boolean acquire_lock) throws Exception {
+ protected void _send(byte[] data, int offset, int length, boolean acquire_lock, boolean flush) throws Exception {
if(acquire_lock)
send_lock.lock();
try {
- doSend(data, offset, length);
+ doSend(data, offset, length, acquire_lock, flush);
updateLastAccessed();
}
catch(InterruptedException iex) {
@@ -474,10 +480,17 @@ protected void _send(byte[] data, int offset, int length, boolean acquire_lock)
}
}
- protected void doSend(byte[] data, int offset, int length) throws Exception {
+ protected void doSend(byte[] data, int offset, int length, boolean acquire_lock, boolean flush) throws Exception {
out.writeInt(length); // write the length of the data buffer first
out.write(data,offset,length);
- out.flush(); // may not be very efficient (but safe)
+ if(!flush || (acquire_lock && send_lock.hasQueuedThreads()))
+ return; // don't flush as some of the waiting threads will do the flush, or flush is false
+ out.flush(); // may not be very efficient (but safe)
+ }
+
+ protected void flush() throws Exception {
+ if(out != null)
+ out.flush();
}
/**
@@ -643,7 +656,7 @@ public void run() {
if(data != null) {
try {
- _send(data, 0, data.length, false);
+ _send(data, 0, data.length, false, send_queue.isEmpty());
}
catch(Throwable ignored) {
}
View
4 src/org/jgroups/protocols/BasicTCP.java
@@ -31,7 +31,7 @@
protected boolean use_send_queues=true;
@Property(description="Max number of messages in a send queue")
- protected int send_queue_size=10000;
+ protected int send_queue_size=2000;
@Property(description="Receiver buffer size in bytes")
protected int recv_buf_size=150000;
@@ -46,7 +46,7 @@
protected int peer_addr_read_timeout=1000; // max time to block on reading of peer address
@Property(description="Should TCP no delay flag be turned on")
- protected boolean tcp_nodelay=true;
+ protected boolean tcp_nodelay=false;
@Property(description="SO_LINGER in msec. Default of -1 disables it")
protected int linger=-1; // SO_LINGER (number of ms, -1 disables it)
View
117 src/org/jgroups/protocols/PERF.java
@@ -0,0 +1,117 @@
+package org.jgroups.protocols;
+
+import org.jgroups.*;
+import org.jgroups.annotations.MBean;
+import org.jgroups.annotations.ManagedAttribute;
+import org.jgroups.annotations.Property;
+import org.jgroups.stack.Protocol;
+import org.jgroups.util.Average;
+import org.jgroups.util.MessageBatch;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+
+/**
+ * Protocol measuring latency between stacks. On {@link Protocol#down(org.jgroups.Event)}, a header is added to the
+ * message with the ID of the PERF protocol and the start time is set in the header.
+ * On {@link Protocol#up(org.jgroups.Event)}, the time different is computed and a rolling average is updated in PERF.<p/>
+ * Note that we can have several measurements by inserting PERF protocols with different IDs (Protocol.id) into the stack.</p>
+ * If PERF is used to measure latency between nodes running on different physical boxes, it is important that the clocks
+ * are synchronized, or else latency cannot be computed correctly (may even be negative).
+ * @author Bela Ban
+ * @since 3.5
+ */
+@MBean(description="Measures latency between PERF instances")
+public class PERF extends Protocol {
+ protected Average avg;
+ protected Address local_addr;
+
+ @Property(description="Number of samples to maintain for rolling average")
+ protected int avg_size=20;
+
+ @ManagedAttribute(description="Average latency in ns")
+ public double latencyInNs() {return avg.getAverage();}
+
+ @ManagedAttribute(description="Average latency in ms")
+ public double latencyInMs() {return avg.getAverage() / 1000000.0;}
+
+ public void init() throws Exception {
+ super.init();
+ avg=new Average(avg_size);
+ }
+
+ public void resetStats() {
+ super.resetStats();
+ avg.clear();
+ }
+
+ public Object down(Event evt) {
+ switch(evt.getType()) {
+ case Event.MSG:
+ Message msg=(Message)evt.getArg();
+ msg.putHeader(id, new PerfHeader(System.nanoTime()));
+ break;
+ case Event.SET_LOCAL_ADDRESS:
+ local_addr=(Address)evt.getArg();
+ break;
+ }
+ return down_prot.down(evt);
+ }
+
+ public Object up(Event evt) {
+ if(evt.getType() == Event.MSG) {
+ Message msg=(Message)evt.getArg();
+ PerfHeader hdr=(PerfHeader)msg.getHeader(id);
+ if(hdr == null)
+ log.error("%s: no perf header found", local_addr);
+ else {
+ long time=System.nanoTime() - hdr.start_time;
+ if(time <= 0)
+ log.error("%s: time is <= 0");
+ else
+ avg.add(time);
+ }
+ }
+ return up_prot.up(evt);
+ }
+
+ public void up(MessageBatch batch) {
+ for(Message msg: batch) {
+ PerfHeader hdr=(PerfHeader)msg.getHeader(id);
+ if(hdr == null)
+ log.error("%s: no perf header found", local_addr);
+ else {
+ long time=System.nanoTime() - hdr.start_time;
+ if(time <= 0)
+ log.error("%s: time is <= 0");
+ else
+ avg.add(time);
+ }
+ }
+
+ super.up(batch);
+ }
+
+ protected static class PerfHeader extends Header {
+ protected long start_time; // in ns
+
+ public PerfHeader() {
+ }
+
+ public PerfHeader(long start_time) {
+ this.start_time=start_time;
+ }
+
+ public int size() {
+ return Global.LONG_SIZE;
+ }
+
+ public void writeTo(DataOutput out) throws Exception {
+ out.writeLong(start_time);
+ }
+
+ public void readFrom(DataInput in) throws Exception {
+ start_time=in.readLong();
+ }
+ }
+}
View
259 src/org/jgroups/protocols/TP.java
@@ -21,6 +21,7 @@
import java.text.NumberFormat;
import java.util.*;
import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
@@ -55,6 +56,7 @@
protected static final byte LIST=1; // we have a list of messages rather than a single message when set
protected static final byte MULTICAST=2; // message is a multicast (versus a unicast) message when set
protected static final int MSG_OFFSET=Global.SHORT_SIZE + Global.BYTE_SIZE*2; // offset for flags for single msgs
+ protected static final int MSG_OVERHEAD=Global.SHORT_SIZE + Global.BYTE_SIZE; // version + flags
protected static final boolean can_bind_to_mcast_addr; // are we running on Linux ?
@@ -162,7 +164,7 @@
protected long oob_thread_pool_keep_alive_time=30000;
@Property(name="oob_thread_pool.queue_enabled", description="Use queue to enqueue incoming OOB messages")
- protected boolean oob_thread_pool_queue_enabled=true;
+ protected boolean oob_thread_pool_queue_enabled=false;
@Property(name="oob_thread_pool.queue_max_size",description="Maximum queue size for incoming OOB messages")
protected int oob_thread_pool_queue_max_size=500;
@@ -188,7 +190,7 @@
@Property(name="thread_pool.queue_max_size", description="Maximum queue size for incoming OOB messages")
- protected int thread_pool_queue_max_size=500;
+ protected int thread_pool_queue_max_size=10000;
@Property(name="thread_pool.rejection_policy",
description="Thread rejection policy. Possible values are Abort, Discard, DiscardOldest and Run")
@@ -263,8 +265,7 @@
@Property(description="Allows the transport to pass received message batches up as MessagesBatch instances " +
"(up(MessageBatch)), rather than individual messages. This flag will be removed in a future version " +
- "when batching has been implemented by all protocols",deprecatedMessage="bundling is enabled by default")
- @Deprecated
+ "when batching has been implemented by all protocols")
protected boolean enable_batching=true;
@Property(description="Whether or not messages with DONT_BUNDLE set should be ignored by default (JGRP-1737). " +
@@ -336,8 +337,9 @@
@Property(name="max_bundle_timeout", description="Max number of milliseconds until queued messages are sent")
protected long max_bundle_timeout=20;
- @Property(description="The type of bundler used. Has to be \"old\" or \"new\" (default)")
- protected String bundler_type="new";
+ @Property(description="The type of bundler used. Has to be \"sender-sends-with-timer\", \"transfer-queue\" (default) " +
+ "or \"sender-sends\"")
+ protected String bundler_type="transfer-queue";
@Property(description="The max number of elements in a bundler if the bundler supports size limitations")
protected int bundler_capacity=20000;
@@ -568,20 +570,20 @@ public void clearDifferentVersionCache() {
protected BlockingQueue<Runnable> internal_thread_pool_queue;
// ================================== Timer thread pool =========================
- protected TimeScheduler timer;
+ protected TimeScheduler timer;
- protected ThreadFactory timer_thread_factory;
+ protected ThreadFactory timer_thread_factory;
// ================================ Default thread factory ========================
/** Used by all threads created by JGroups outside of the thread pools */
- protected ThreadFactory global_thread_factory=null;
+ protected ThreadFactory global_thread_factory=null;
// ================================= Default SocketFactory ========================
- protected SocketFactory socket_factory=new DefaultSocketFactory();
+ protected SocketFactory socket_factory=new DefaultSocketFactory();
- protected Bundler bundler;
+ protected Bundler bundler;
- protected DiagnosticsHandler diag_handler;
+ protected DiagnosticsHandler diag_handler;
protected final List<DiagnosticsHandler.ProbeHandler> preregistered_probe_handlers=new LinkedList<DiagnosticsHandler.ProbeHandler>();
/**
@@ -593,7 +595,7 @@ public void clearDifferentVersionCache() {
/** The header including the cluster name, sent with each message. Not used with a shared transport (instead
* TP.ProtocolAdapter attaches the header to the message */
- protected TpHeader header;
+ protected TpHeader header;
/**
@@ -687,6 +689,10 @@ public void setDiagnosticsHandler(DiagnosticsHandler handler) {
}
}
+ /** Installs a bundler. Needs to be done before the channel is connected */
+ public void setBundler(Bundler bundler) {
+ this.bundler=bundler;
+ }
public void setThreadPoolQueueEnabled(boolean flag) {thread_pool_queue_enabled=flag;}
@@ -1158,20 +1164,25 @@ public void start() throws Exception {
if(enable_diagnostics)
startDiagnostics();
- if(bundler_type.startsWith("new")) {
- if(bundler_type.endsWith("new2"))
- log.warn(Util.getMessage("OldBundlerType"), "new2", "TransferQueueBundler (new)");
- bundler=new TransferQueueBundler(bundler_capacity);
- }
- else if(bundler_type.startsWith("old")) {
- if(bundler_type.endsWith("old2"))
- log.warn(Util.getMessage("OldBundlerType"), "old2", "DefaultBundler (old)");
- bundler=new DefaultBundler();
+ if(bundler == null) {
+ if(bundler_type.startsWith("sender-sends-with-timer") || bundler_type.startsWith("old")) {
+ if(bundler_type.startsWith("old"))
+ log.warn(Util.getMessage("OldBundlerType"), bundler_type, "sender-sends-with-timer");
+ bundler=new SenderSendsWithTimerBundler();
+ }
+ else if(bundler_type.startsWith("transfer-queue") || bundler_type.startsWith("new")) {
+ if(bundler_type.startsWith("new"))
+ log.warn(Util.getMessage("OldBundlerType"), bundler_type, "transfer-queue");
+ bundler=new TransferQueueBundler(bundler_capacity);
+ }
+ else if(bundler_type.startsWith("sender-sends")) {
+ bundler=new SenderSendsBundler();
+ }
+ else
+ log.warn(Util.getMessage("UnknownBundler"), bundler_type);
+ if(bundler == null)
+ bundler=new TransferQueueBundler(bundler_capacity);
}
- else
- log.warn(Util.getMessage("UnknownBundler"), bundler_type);
- if(bundler == null)
- bundler=new TransferQueueBundler(bundler_capacity);
bundler.start();
// local_addr is null when shared transport
@@ -1727,7 +1738,7 @@ protected void send(Message msg, Address dest, boolean multicast) throws Excepti
// bundle all messages, even the ones tagged with DONT_BUNDLE, except if we use the old bundler (DefaultBundler)
// JIRA: https://issues.jboss.org/browse/JGRP-1737
boolean bypass_bundling=msg.isFlagSet(Message.Flag.DONT_BUNDLE) &&
- (!ignore_dont_bundle || bundler instanceof DefaultBundler || dest instanceof PhysicalAddress);
+ (!ignore_dont_bundle || bundler instanceof SenderSendsWithTimerBundler || dest instanceof PhysicalAddress);
if(!bypass_bundling) {
bundler.send(msg);
return;
@@ -1735,10 +1746,10 @@ protected void send(Message msg, Address dest, boolean multicast) throws Excepti
// we can create between 300'000 - 400'000 output streams and do the marshalling per second,
// so this is not a bottleneck !
- ExposedByteArrayOutputStream out_stream=new ExposedByteArrayOutputStream((int)(msg.size() + 50));
- ExposedDataOutputStream dos=new ExposedDataOutputStream(out_stream);
+ ExposedByteArrayOutputStream out_stream=new ExposedByteArrayOutputStream((int)(msg.size() + MSG_OVERHEAD)); // version+flag+msg
+ DataOutputStream dos=new DataOutputStream(out_stream); // ok, the locks are uncontended, so this is fast
writeMessage(msg, dos, multicast);
- Buffer buf=new Buffer(out_stream.getRawBuffer(), 0, out_stream.size());
+ Buffer buf=out_stream.getBuffer();
doSend(buf, dest, multicast);
// we don't need to close() or flush() any of the 2 streams above, as these ops are no-ops
}
@@ -2223,10 +2234,14 @@ public void clearLogicalAddressCache() {
* message is added that doesn't exceed the max size, but then no further messages are added, so elapsed time
* will trigger the sending, not exceeding of the max size.
*/
- protected class DefaultBundler implements Bundler {
+ protected class SenderSendsWithTimerBundler implements Bundler {
static final int MIN_NUMBER_OF_BUNDLING_TASKS=2;
/** Keys are destinations, values are lists of Messages */
final Map<SingletonAddress,List<Message>> msgs=new HashMap<SingletonAddress,List<Message>>(36);
+
+ final ExposedByteArrayOutputStream bundler_out_stream=new ExposedByteArrayOutputStream(1024);
+ final ExposedDataOutputStream bundler_dos=new ExposedDataOutputStream(bundler_out_stream);
+
@GuardedBy("lock")
long count=0; // current number of bytes accumulated
int num_msgs=0;
@@ -2301,9 +2316,6 @@ private void sendBundledMessages(final Map<SingletonAddress,List<Message>> msgs)
log.trace(sb);
}
- ExposedByteArrayOutputStream bundler_out_stream=new ExposedByteArrayOutputStream((int)(count + 50));
- ExposedDataOutputStream bundler_dos=new ExposedDataOutputStream(bundler_out_stream);
-
for(Map.Entry<SingletonAddress,List<Message>> entry: msgs.entrySet()) {
List<Message> list=entry.getValue();
if(list.isEmpty())
@@ -2318,7 +2330,7 @@ private void sendBundledMessages(final Map<SingletonAddress,List<Message>> msgs)
bundler_out_stream.reset();
bundler_dos.reset();
writeMessageList(dest, src_addr, cluster_name, list, bundler_dos, multicast, id); // flushes output stream when done
- Buffer buffer=new Buffer(bundler_out_stream.getRawBuffer(), 0, bundler_out_stream.size());
+ Buffer buffer=bundler_out_stream.getBuffer();
doSend(buffer, dest, multicast);
}
catch(Throwable e) {
@@ -2363,8 +2375,180 @@ public String toString() {
}
}
+ public class SenderSendsBundler implements Bundler {
+ /** Keys are destinations, values are lists of Messages */
+ protected Map<SingletonAddress,List<Message>> msgs=createHashMap();
+ @GuardedBy("lock")
+ long count=0; // current number of bytes accumulated
+ final ReentrantLock lock=new ReentrantLock();
+
+ protected final ExposedByteArrayOutputStream bundler_out_stream=new ExposedByteArrayOutputStream(1024);
+ protected final ExposedDataOutputStream bundler_dos=new ExposedDataOutputStream(bundler_out_stream);
+
+ // current senders adding messages to the bundler
+ final AtomicInteger num_senders=new AtomicInteger(0);
+
+ public void start() {}
+ public void stop() {}
+
+
+
+
+ public void send(Message msg) throws Exception {
+ long length=msg.size();
+ checkLength(length);
+ num_senders.incrementAndGet();
+
+ lock.lock();
+ try {
+ num_senders.decrementAndGet();
+
+ if(count + length >= max_bundle_size)
+ sendMap();
+
+ // at this point, we haven't sent our message yet !
+ if(num_senders.get() == 0) { // no other sender threads present at this time
+ if(count == 0)
+ sendSingleMessage(msg);
+ else {
+ addMessage(msg, length);
+ sendMap();
+ }
+ }
+ else // there are other sender threads waiting, so our message will be sent by a different thread
+ addMessage(msg, length);
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+
+ @GuardedBy("lock")
+ protected Map<SingletonAddress,List<Message>> createHashMap() {
+ return new HashMap<SingletonAddress,List<Message>>(4);
+ }
+
+ @GuardedBy("lock")
+ protected void sendMap() {
+ sendBundledMessages(msgs, count);
+ msgs.clear();
+ count=0;
+ }
+
+
+ @GuardedBy("lock")
+ private void addMessage(Message msg, long length) {
+ String cluster_name;
+
+ if(!isSingleton())
+ cluster_name=TP.this.channel_name;
+ else {
+ TpHeader hdr=(TpHeader)msg.getHeader(id);
+ cluster_name=hdr.channel_name;
+ }
+
+ SingletonAddress dest=new SingletonAddress(cluster_name, msg.getDest());
+
+ List<Message> tmp=msgs.get(dest);
+ if(tmp == null) {
+ tmp=new LinkedList<Message>();
+ msgs.put(dest, tmp);
+ }
+ tmp.add(msg);
+ count+=length;
+ }
+
+
+ /**
+ * Sends all messages from the map, all messages for the same destination are bundled into 1 message.
+ * This method may be called by timer and bundler concurrently
+ * @param msgs
+ */
+ private void sendBundledMessages(final Map<SingletonAddress,List<Message>> msgs, long count) {
+ if(log.isTraceEnabled()) {
+ double percentage=100.0 / max_bundle_size * count;
+ StringBuilder sb=new StringBuilder(local_addr + ": sending ").append(numMessages(msgs)).append(" msgs (");
+ sb.append(count).append(" bytes (").append(f.format(percentage)).append("% of max_bundle_size)");
+ sb.append(" to ").append(msgs.size()).append(" destination(s)");
+ if(msgs.size() > 1) sb.append(" (dests=").append(msgs.keySet()).append(")");
+ log.trace(sb);
+ }
+
+ for(Map.Entry<SingletonAddress,List<Message>> entry: msgs.entrySet()) {
+ List<Message> list=entry.getValue();
+ if(list.isEmpty())
+ continue;
+ SingletonAddress dst=entry.getKey();
+ Address dest=dst.getAddress();
+ boolean multicast=dest == null;
+ bundler_out_stream.reset();
+ bundler_dos.reset();
+ if(list.size() == 1) {
+ Message msg=null;
+ try {
+ msg=list.get(0);
+ writeMessage(msg, bundler_dos, multicast);
+ Buffer buf=bundler_out_stream.getBuffer();
+ doSend(buf, dest, multicast);
+ }
+ catch(Throwable e) {
+ log.error(Util.getMessage("SendFailure"),
+ local_addr, (dest == null? "cluster" : dest), msg.size(), e.toString(), msg.printHeaders());
+ }
+ }
+ else {
+ try {
+ Address src_addr=list.get(0).getSrc();
+ String cluster_name=dst.getClusterName();
+ writeMessageList(dest, src_addr, cluster_name, list, bundler_dos, multicast, id); // flushes output stream when done
+ Buffer buf=bundler_out_stream.getBuffer();
+ doSend(buf, dest, multicast);
+ }
+ catch(Throwable e) {
+ log.error(Util.getMessage("FailureSendingMsgBundle"), local_addr, e);
+ }
+ }
+ }
+ }
+
+ protected void sendSingleMessage(final Message msg) {
+ Address dest=msg.getDest();
+ boolean multicast=dest == null;
+
+ try {
+ bundler_out_stream.reset();
+ bundler_dos.reset();
+ writeMessage(msg, bundler_dos, multicast);
+ Buffer buf=bundler_out_stream.getBuffer();
+ doSend(buf, dest, multicast);
+ }
+ catch(Throwable e) {
+ log.error(Util.getMessage("SendFailure"),
+ local_addr, (dest == null? "cluster" : dest), msg.size(), e.toString(), msg.printHeaders());
+ }
+ }
+
+ protected int numMessages(final Map<SingletonAddress,List<Message>> msgs) {
+ int num=0;
+ Collection<List<Message>> values=msgs.values();
+ for(List<Message> list: values)
+ num+=list.size();
+ return num;
+ }
+
+
+ private void checkLength(long len) throws Exception {
+ if(len > max_bundle_size)
+ throw new Exception("message size (" + len + ") is greater than max bundling size (" + max_bundle_size +
+ "). Set the fragmentation/bundle size in FRAG and TP correctly");
+ }
+
+
+ }
+
/**
* This bundler adds all (unicast or multicast) messages to a queue until max size has been exceeded, but does send
@@ -2390,6 +2574,7 @@ public String toString() {
protected TransferQueueBundler(int capacity) {
if(capacity <=0) throw new IllegalArgumentException("Bundler capacity cannot be " + capacity);
buffer=new LinkedBlockingQueue<Message>(capacity);
+ // buffer=new ConcurrentLinkedBlockingQueue2<Message>(capacity);
threshold=(int)(capacity * .9); // 90% of capacity
}
@@ -2520,7 +2705,7 @@ protected void sendBundledMessages(final Map<SingletonAddress,List<Message>> msg
try {
msg=list.get(0);
writeMessage(msg, bundler_dos, multicast);
- Buffer buf=new Buffer(bundler_out_stream.getRawBuffer(), 0, bundler_out_stream.size());
+ Buffer buf=bundler_out_stream.getBuffer();
doSend(buf, dest, multicast);
}
catch(Throwable e) {
@@ -2531,7 +2716,7 @@ protected void sendBundledMessages(final Map<SingletonAddress,List<Message>> msg
else {
try {
writeMessageList(dest, src_addr, cluster_name, list, bundler_dos, multicast, id); // flushes output stream when done
- Buffer buf=new Buffer(bundler_out_stream.getRawBuffer(), 0, bundler_out_stream.size());
+ Buffer buf=bundler_out_stream.getBuffer();
doSend(buf, dest, multicast);
}
catch(Throwable e) {
View
22 src/org/jgroups/util/Average.java
@@ -1,5 +1,7 @@
package org.jgroups.util;
+import java.util.concurrent.atomic.AtomicInteger;
+
/**
* A class which uses a simple array (of configurable length) to store stats samples. The average is then taken as
* the sum of all values >= 0 (the array is initialized with -1 values) divided by the number of non -1 values. This is
@@ -10,20 +12,26 @@
* @since 3.4
*/
public class Average {
- protected final long[] samples;
+ protected final long[] samples;
+ protected AtomicInteger index=new AtomicInteger(0);
public Average() {
- this(10);
+ this(64);
}
public Average(int size) {
- samples=new long[size];
+ samples=new long[Util.getNextHigherPowerOfTwo(size)];
for(int i=0; i < samples.length; i++)
samples[i]=-1;
}
public void add(long sample) {
- samples[((int)Util.random(samples.length) -1)]=sample;
+ // samples[((int)Util.random(samples.length) -1)]=sample;
+ int tmp_index=index.getAndIncrement();
+ int real_index=tmp_index & (samples.length -1); // fast modulo operation
+ samples[real_index]=sample;
+ if(tmp_index >= samples.length)
+ index.set(0);
}
public double getAverage() {
@@ -39,4 +47,10 @@ public double getAverage() {
return num > 0? total / (double)num : 0;
}
+ public void clear() {
+ for(int i=0; i < samples.length; i++)
+ samples[i]=-1;
+ }
+
+
}
View
187 src/org/jgroups/util/ByteBufferInputStream.java
@@ -0,0 +1,187 @@
+package org.jgroups.util;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+
+/**
+ * Class using {@link ByteBuffer} and implementing {@link DataInput}.
+ * @author Bela Ban
+ * @since 3.5
+ */
+public class ByteBufferInputStream implements DataInput {
+ protected final ByteBuffer buf;
+
+ public ByteBufferInputStream(ByteBuffer buf) {
+ this.buf=buf;
+ }
+
+ public void readFully(byte[] b) throws IOException {
+ readFully(b, 0, b.length);
+ }
+
+ public void readFully(byte[] b, int off, int len) throws IOException {
+ buf.get(b, off, len);
+ }
+
+
+ public int skipBytes(int n) throws IOException {
+ int skip=Math.min(n, buf.limit() - buf.position());
+ buf.limit(buf.limit() + skip);
+ return skip;
+ }
+
+ public boolean readBoolean() throws IOException {
+ return buf.get() == 1;
+ }
+
+ public byte readByte() throws IOException {
+ return buf.get();
+ }
+
+ public int readUnsignedByte() throws IOException {
+ int ch=readByte();
+ if(ch < 0)
+ throw new EOFException();
+ return ch;
+ }
+
+ public short readShort() throws IOException {
+ return buf.getShort();
+ }
+
+ public int readUnsignedShort() throws IOException {
+ int retval=readShort();
+ if(retval < 0)
+ throw new EOFException();
+ return retval;
+ }
+
+ public char readChar() throws IOException {
+ return buf.getChar();
+ }
+
+ public int readInt() throws IOException {
+ return buf.getInt();
+ }
+
+ public long readLong() throws IOException {
+ return buf.getLong();
+ }
+
+ public float readFloat() throws IOException {
+ return buf.getFloat();
+ }
+
+ public double readDouble() throws IOException {
+ return buf.getDouble();
+ }
+
+ public String readLine() throws IOException {
+ char[] lineBuffer=new char[128];
+ char buffer[] = lineBuffer;
+
+ if (buffer == null) {
+ buffer = lineBuffer = new char[128];
+ }
+
+ int room = buffer.length;
+ int offset = 0;
+ int c;
+
+ loop: while (true) {
+ switch (c = readByte()) {
+ case -1:
+ case '\n':
+ break loop;
+
+ case '\r':
+ int c2 = readByte();
+ if ((c2 != '\n') && (c2 != -1))
+ ;
+ break loop;
+
+ default:
+ if (--room < 0) {
+ buffer = new char[offset + 128];
+ room = buffer.length - offset - 1;
+ System.arraycopy(lineBuffer, 0, buffer, 0, offset);
+ lineBuffer = buffer;
+ }
+ buffer[offset++] = (char) c;
+ break;
+ }
+ }
+ if ((c == -1) && (offset == 0)) {
+ return null;
+ }
+ return String.copyValueOf(buffer, 0, offset);
+ }
+
+ public String readUTF() throws IOException {
+ int utflen = readUnsignedShort();
+ byte[] bytearr = new byte[utflen];
+ char[] chararr = new char[utflen];
+
+ int c, char2, char3;
+ int count = 0;
+ int chararr_count=0;
+
+ readFully(bytearr,0,utflen);
+
+ while (count < utflen) {
+ c = (int) bytearr[count] & 0xff;
+ if (c > 127) break;
+ count++;
+ chararr[chararr_count++]=(char)c;
+ }
+
+ while (count < utflen) {
+ c = (int) bytearr[count] & 0xff;
+ switch (c >> 4) {
+ case 0: case 1: case 2: case 3: case 4: case 5: case 6: case 7:
+ /* 0xxxxxxx*/
+ count++;
+ chararr[chararr_count++]=(char)c;
+ break;
+ case 12: case 13:
+ /* 110x xxxx 10xx xxxx*/
+ count += 2;
+ if (count > utflen)
+ throw new UTFDataFormatException(
+ "malformed input: partial character at end");
+ char2 = (int) bytearr[count-1];
+ if ((char2 & 0xC0) != 0x80)
+ throw new UTFDataFormatException(
+ "malformed input around byte " + count);
+ chararr[chararr_count++]=(char)(((c & 0x1F) << 6) |
+ (char2 & 0x3F));
+ break;
+ case 14:
+ /* 1110 xxxx 10xx xxxx 10xx xxxx */
+ count += 3;
+ if (count > utflen)
+ throw new UTFDataFormatException(
+ "malformed input: partial character at end");
+ char2 = (int) bytearr[count-2];
+ char3 = (int) bytearr[count-1];
+ if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80))
+ throw new UTFDataFormatException(
+ "malformed input around byte " + (count-1));
+ chararr[chararr_count++]=(char)(((c & 0x0F) << 12) |
+ ((char2 & 0x3F) << 6) |
+ ((char3 & 0x3F) << 0));
+ break;
+ default:
+ /* 10xx xxxx, 1111 xxxx */
+ throw new UTFDataFormatException(
+ "malformed input around byte " + count);
+ }
+ }
+ // The number of chars produced may be less than utflen
+ return new String(chararr, 0, chararr_count);
+ }
+
+ public String toString() {
+ return buf.toString();
+ }
+}
View
143 src/org/jgroups/util/ByteBufferOutputStream.java
@@ -0,0 +1,143 @@
+package org.jgroups.util;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.UTFDataFormatException;
+import java.nio.ByteBuffer;
+
+/**
+ * Class using {@link ByteBuffer} and implementing {@link DataOutput}. Note that all instances need to pass the
+ * exact size of the marshalled object as a ByteBuffer cannot be expanded.
+ * @author Bela Ban
+ * @since 3.5
+ */
+public class ByteBufferOutputStream implements DataOutput {
+ protected final ByteBuffer buf;
+
+
+ public ByteBufferOutputStream(ByteBuffer buf) {
+ this.buf=buf;
+ }
+
+ public void reset() {buf.clear();}
+
+ public ByteBuffer getBuffer() {return buf;}
+
+ public Buffer getBufferAsBuffer() {
+ return new Buffer(buf.array(), buf.arrayOffset(), buf.position());
+ }
+
+
+ public void write(int b) throws IOException {
+ buf.put((byte)b);
+ }
+
+ public void write(byte[] b) throws IOException {
+ write(b,0,b.length);
+ }
+
+ public void write(byte[] b, int off, int len) throws IOException {
+ buf.put(b, off, len);
+ }
+
+ public void writeBoolean(boolean v) throws IOException {
+ write(v? 1 : 0);
+ }
+
+ public void writeByte(int v) throws IOException {
+ write(v);
+ }
+
+ public void writeShort(int v) throws IOException {
+ buf.putShort((short)v);
+ }
+
+ public void writeChar(int v) throws IOException {
+ buf.putChar((char)v);
+ }
+
+ public void writeInt(int v) throws IOException {
+ buf.putInt(v);
+ }
+
+ public void writeLong(long v) throws IOException {
+ buf.putLong(v);
+ }
+
+ public void writeFloat(float v) throws IOException {
+ buf.putFloat(v);
+ }
+
+ public void writeDouble(double v) throws IOException {
+ buf.putDouble(v);
+ }
+
+ public void writeBytes(String s) throws IOException {
+ int len=s.length();
+ for(int i = 0 ; i < len ; i++)
+ write((byte)s.charAt(i));
+ }
+
+ public void writeChars(String s) throws IOException {
+ int len=s.length();
+ for (int i = 0 ; i < len ; i++) {
+ int v = s.charAt(i);
+ write((v >>> 8) & 0xFF);
+ write((v >>> 0) & 0xFF);
+ }
+ }
+
+ public void writeUTF(String str) throws IOException {
+ int strlen = str.length();
+ int utflen = 0;
+ int c, count = 0;
+
+ /* use charAt instead of copying String to char array */
+ for (int i = 0; i < strlen; i++) {
+ c = str.charAt(i);
+ if ((c >= 0x0001) && (c <= 0x007F)) {
+ utflen++;
+ } else if (c > 0x07FF) {
+ utflen += 3;
+ } else {
+ utflen += 2;
+ }
+ }
+
+ if (utflen > 65535)
+ throw new UTFDataFormatException(
+ "encoded string too long: " + utflen + " bytes");
+
+ byte[] bytearr=new byte[utflen+2];
+
+ bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF);
+ bytearr[count++] = (byte) ((utflen >>> 0) & 0xFF);
+
+ int i=0;
+ for (i=0; i<strlen; i++) {
+ c = str.charAt(i);
+ if (!((c >= 0x0001) && (c <= 0x007F))) break;
+ bytearr[count++] = (byte) c;
+ }
+
+ for (;i < strlen; i++){
+ c = str.charAt(i);
+ if ((c >= 0x0001) && (c <= 0x007F)) {
+ bytearr[count++] = (byte) c;
+
+ } else if (c > 0x07FF) {
+ bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
+ bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F));
+ bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F));
+ } else {
+ bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
+ bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F));
+ }
+ }
+ write(bytearr,0,utflen + 2);
+ }
+
+ public String toString() {
+ return buf.toString();
+ }
+}
View
182 src/org/jgroups/util/ConcurrentLinkedBlockingQueue2.java
@@ -0,0 +1,182 @@
+package org.jgroups.util;
+
+import java.util.Collection;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Attempt at writing a fast transfer queue, which is bounded. The take() method blocks until there is an element, but
+ * the offer() method drops the element and returns if the queue is full (doesn't block). <p/>
+ * The design assumes a number of producers but only <em>one</em> consumer. The consumer only blocks when the queue is
+ * empty (on the not-empty condition), the producers block when the queue is full (on the not-full condition). The
+ * producers increment a count atomically and if the count is greater than the capacity, they block on the not-full
+ * condition. The consumer decrements the condition and signals the not-full condition when the count is capacity -1
+ * (from capacity to capacity-1).
+ * The producers signal not-empty when the count is 1 (from 0 to 1)
+ *
+ * @author Bela Ban
+ * @since 3.5
+ */
+public class ConcurrentLinkedBlockingQueue2<T> extends ConcurrentLinkedQueue<T> implements BlockingQueue<T> {
+ private static final long serialVersionUID=2539983016900218313L;
+ protected final int capacity;
+ protected final AtomicInteger count=new AtomicInteger(0);
+
+ // not_empty is signalled by a producer when count went from 0 to 1
+ protected final Lock not_empty_lock=new ReentrantLock();
+ protected final Condition not_empty=not_empty_lock.newCondition();
+
+
+
+ public ConcurrentLinkedBlockingQueue2(int capacity) {
+ this.capacity=capacity;
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ public void run() {
+ System.out.println("** num_awaits=" + not_empty_awaits);
+ }
+ });
+
+ /* Thread thread=new Thread() {
+ public void run() {
+ while(true) {
+ System.out.println("*** size=" + size());
+ Util.sleep(1000);
+ }
+ }
+ };
+ thread.setDaemon(true);
+ thread.start();*/
+ }
+
+
+ int not_empty_awaits=0;
+
+
+ /**
+ * Drops elements if capacity has been reached. That's OK for the ThreadPoolExecutor as dropped messages
+ * will get retransmitted
+ * @param t
+ * @return
+ */
+ public boolean offer(T t) {
+ boolean retval=super.offer(t);
+ if(retval) count.incrementAndGet();
+ return retval;
+ }
+
+ public T take() throws InterruptedException {
+ T val=super.poll();
+ if(val != null) {
+ decrCount();
+ return val;
+ }
+
+ waitForNotEmpty();
+
+ // at this stage, we are guaranteed to have a value
+ val=super.poll();
+ if(val != null)
+ decrCount();
+ return val;
+ }
+
+
+
+ public T poll() {
+ T val=super.poll();
+ if(val != null)
+ decrCount();
+ return val;
+ }
+
+ public T poll(long timeout, TimeUnit unit) throws InterruptedException {
+ return null;
+ }
+
+ public boolean remove(Object o) {
+ boolean retval=super.remove(o);
+ if(retval)
+ decrCount();
+ return retval;
+ }
+
+ public int remainingCapacity() {
+ return capacity - size();
+ }
+
+ public int drainTo(Collection<? super T> c) {
+ int cnt=0;
+ if(c == null)
+ return cnt;
+
+ for(;;) {
+ T el=poll();
+ if(el == null)
+ break;
+ c.add(el);
+ cnt++;
+ }
+
+ count.set(0);
+
+ return cnt;
+ }
+
+
+ public void put(T t) throws InterruptedException {
+ if(super.offer(t))
+ incrCount();
+ }
+
+ public boolean offer(T t, long timeout, TimeUnit unit) throws InterruptedException {
+ return offer(t);
+ }
+
+ public int size() {
+ return count.get();
+ }
+
+ public int drainTo(Collection<? super T> c, int maxElements) {
+ return drainTo(c);
+ }
+
+
+ protected void waitForNotEmpty() throws InterruptedException {
+ while(count.get() == 0) {
+ not_empty_lock.lock();
+ try {
+ // System.out.println("-----> waiting for not empty: num_awaits=" + ++not_empty_awaits + ", count=" + count);
+ if(count.get() > 0)
+ return;
+ not_empty_awaits++;
+ not_empty.await();
+ }
+ finally {
+ not_empty_lock.unlock();
+ }
+ }
+ }
+
+
+ protected void decrCount() {
+ count.getAndDecrement();
+ }
+
+ protected void incrCount() {
+ int prev_count=count.getAndIncrement();
+ if(prev_count == 0) {
+ not_empty_lock.lock();
+ try {
+ not_empty.signal(); // not signalAll() as there is only *one* consumer !
+ }
+ finally {
+ not_empty_lock.unlock();
+ }
+ }
+ }
+}
View
7 src/org/jgroups/util/ExposedByteArrayOutputStream.java
@@ -7,7 +7,8 @@
/**
* Extends ByteArrayOutputStream, but exposes the internal buffer. This way we don't need to call
- * toByteArray() which copies the internal buffer
+ * toByteArray() which copies the internal buffer. This class is unsynchronized and needs to be
+ * synchronized externally.
* @author Bela Ban
*/
public class ExposedByteArrayOutputStream extends ByteArrayOutputStream {
@@ -138,7 +139,7 @@ public int size() {
* @since JDK1.1
*/
public String toString() {
- return new String(buf, 0, count);
+ return Util.printBytes(count);
}
/**
@@ -160,7 +161,7 @@ public String toString() {
*/
public String toString(String charsetName)
throws UnsupportedEncodingException {
- return new String(buf, 0, count, charsetName);
+ return Util.printBytes(count);
}
View
7 src/org/jgroups/util/ExposedDataOutputStream.java
@@ -31,7 +31,7 @@ public OutputStream getOutputStream() {
public void write(int b) throws IOException {
out.write(b);
- incCount(1);
+ incrementCount(1);
}
/**
@@ -48,11 +48,12 @@ public void write(int b) throws IOException {
public void write(byte b[], int off, int len)
throws IOException {
out.write(b, off, len);
- incCount(len);
+ incrementCount(len);
}
- private void incCount(int value) {
+ // Overwrites incCount() of super class, which is private
+ protected void incrementCount(int value) {
int temp=written + value;
if(temp < 0) {
temp=Integer.MAX_VALUE;
View
105 src/org/jgroups/util/Pool.java
@@ -0,0 +1,105 @@
+package org.jgroups.util;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Manages a fixed pool of resources (e.g. buffers). Uses the fast try-lock operation to get a resource from the pool,
+ * or returns a newly created resource. When the returned lock is unlocks, that resource becomes available again
+ * for consumption
+ * @author Bela Ban
+ * @since 3.5
+ */
+public class Pool<T> {
+ protected final T[] pool;
+ protected final Lock[] locks;
+ protected final Creator<T> creator;
+
+ public interface Creator<T> {
+ T create();
+ }
+
+ @SuppressWarnings("unchecked")
+ public Pool(int capacity, Creator<T> creator) {
+ this.creator=creator;
+ this.pool=(T[])new Object[Util.getNextHigherPowerOfTwo(capacity)];
+ this.locks=new Lock[pool.length];
+ for(int i=0; i < locks.length; i++)
+ locks[i]=new ReentrantLock();
+ }
+
+ public T[] getElements() {return pool;}
+
+ public int getNumLocked() {
+ int retval=0;
+ for(Lock lock: locks)
+ if(((ReentrantLock)lock).isLocked())
+ retval++;
+ return retval;
+ }
+
+
+ /**
+ * Gets the next available resource for which the lock can be acquired and returns it and its associated
+ * lock, which needs to be released when the caller is done using the resource. If no resource in the pool can be
+ * locked, returns a newly created resource and a null lock. This means that no lock was acquired and thus
+ * doesn't need to be released.
+ * @return An Element with T and a lock (possibly null if newly created)
+ */
+ public Element<T> get() {
+ // start at a random index, so different threads don't all start at index 0 and compete for the lock
+ int starting_index=((int)(Math.random() * pool.length)) & (pool.length - 1);
+ for(int i=0; i < locks.length; i++) {
+ int index=(starting_index + i) & (pool.length -1);
+ Lock lock=locks[index];
+ if(lock.tryLock()) {
+ if(pool[index] != null)
+ return new Element<T>(pool[index], lock);
+ return new Element<T>(pool[index]=creator.create(), lock);
+ }
+ }
+ return new Element<T>(creator.create(), null);
+ }
+
+
+ public String toString() {
+ StringBuilder sb=new StringBuilder();
+ int locked=getNumLocked();
+ sb.append("capacity=" + pool.length + ", locked=" + locked + ", available=" + (pool.length - locked));
+ return sb.toString();
+ }
+
+ public class Element<T> {
+ protected final T element;
+ protected final Lock lock;
+
+ public Element(T element, Lock lock) {
+ this.element=element;
+ this.lock=lock;
+ }
+
+ public T getElement() {return element;}
+ public Lock getLock() {return lock;}
+ public String toString() {return element + ", " + lock;}
+ }
+
+
+
+ public static void main(String[] args) {
+ final int length=8;
+ final Map<Integer,Integer> map=new HashMap<Integer,Integer>(length);
+ for(int i=0; i < 100; i++) {
+ int index=((int)(Math.random() * length)) & (length - 1);
+ System.out.println("index = " + index);
+ Integer count=map.get(index);
+ if(count == null)
+ map.put(index, 1);
+ else
+ map.put(index, ++count);
+ }
+ System.out.println("map = " + map);
+ }
+
+}
View
4 src/org/jgroups/util/UUID.java
@@ -112,6 +112,10 @@ public static String get(Address logical_addr) {
return cache.get(logical_addr);
}
+ public static Address getByName(String logical_name) {
+ return cache.getByValue(logical_name);
+ }
+
/** Returns a <em>copy</em> of the cache's contents */
public static Map<Address,String> getContents() {
return cache.contents();
View
40 tests/junit-functional/org/jgroups/tests/ByteBufferOutputStreamTest.java
@@ -0,0 +1,40 @@
+package org.jgroups.tests;
+
+import org.jgroups.Address;
+import org.jgroups.Global;
+import org.jgroups.Message;
+import org.jgroups.protocols.pbcast.NakAckHeader2;
+import org.jgroups.util.ByteBufferInputStream;
+import org.jgroups.util.ByteBufferOutputStream;
+import org.jgroups.util.Util;
+import org.testng.annotations.Test;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Tests {@link ByteBufferOutputStream}
+ * @author Bela Ban
+ * @since 3.5
+ */
+@Test(groups=Global.FUNCTIONAL)
+public class ByteBufferOutputStreamTest {
+
+ public void testConstruction() throws Exception {
+ Address dest=Util.createRandomAddress("A");
+ Message msg=new Message(dest, "hello world")
+ .setFlag(Message.Flag.DONT_BUNDLE,Message.Flag.OOB).putHeader((short)22, NakAckHeader2.createMessageHeader(322649));
+ int size=(int)(msg.size());
+ ByteBuffer buf=ByteBuffer.allocate(size);
+ ByteBufferOutputStream out=new ByteBufferOutputStream(buf);
+ msg.writeTo(out);
+
+ buf.flip();
+ byte[] array=new byte[buf.limit()];
+ System.arraycopy(buf.array(), buf.arrayOffset(), array, 0, buf.limit());
+ ByteBufferInputStream in=new ByteBufferInputStream(ByteBuffer.wrap(array));
+ Message copy=new Message(false);
+ copy.readFrom(in);
+ System.out.println("copy = " + copy);
+ assert msg.getDest() != null && msg.getDest().equals(dest);
+ }
+}
View
96 tests/junit-functional/org/jgroups/tests/PoolTest.java
@@ -0,0 +1,96 @@
+package org.jgroups.tests;
+
+import org.jgroups.Global;
+import org.jgroups.util.Pool;
+import org.jgroups.util.Util;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * @author Bela Ban
+ * @since 3.5
+ */
+@Test(groups=Global.FUNCTIONAL)
+public class PoolTest {
+
+
+ public void testSimpleGet() throws Exception {
+ Pool<Integer> pool=new Pool<Integer>(4, new Pool.Creator<Integer>() {
+ public Integer create() {return (int)Util.random(10);}
+ });
+ List<Lock> locks=grab(pool, 3);
+ System.out.println("pool = " + pool);
+ assert pool.getNumLocked() == 3;
+ assert locks.size() == 3;
+ }
+
+
+ public void testGet() throws Exception {
+ Pool<Integer> pool=new Pool<Integer>(4, new Pool.Creator<Integer>() {
+ public Integer create() {return (int)Util.random(10);}
+ });
+
+ List<Lock> locks=grab(pool, 3);
+ System.out.println("pool: " + pool);
+
+ assert pool.getNumLocked() == 3;
+ locks.addAll(grab(pool,3));
+
+ assert locks.size() == 4;
+
+ Pool.Element el=pool.get();
+ assert el.getLock() == null;
+ }
+
+ public void testGetAndUnlock() {
+ Pool<Integer> pool=new Pool<Integer>(4, new Pool.Creator<Integer>() {
+ public Integer create() {return (int)Util.random(10);}
+ });
+ List<Lock> locks=new ArrayList<Lock>(4);
+ for(int i=0; i < 4; i++)
+ locks.add(pool.get().getLock());
+ System.out.println("locks = " + locks);
+
+ // the same thread can lock the same lock multiple times
+ assert pool.getNumLocked() > 0 && pool.getNumLocked() <= 4;
+ for(Lock lock: locks)
+ lock.unlock();
+ System.out.println("pool = " + pool);
+ assert pool.getNumLocked() == 0;
+ }
+
+
+ protected static <T>List<Lock> grab(Pool<T> pool, int num) throws Exception {
+ List<Lock> retval=new ArrayList<Lock>(num);
+ for(int i=0; i < num; i++) {
+ Grabber grabber=new Grabber(pool);
+ grabber.start();
+ grabber.join();
+ retval.addAll(grabber.getLocks());
+ }
+ return retval;
+ }
+
+
+ protected static class Grabber<T> extends Thread {
+ protected final Pool<T> pool;
+ protected final List<Lock> locks=new ArrayList<Lock>();
+
+ public Grabber(Pool<T> pool) {
+ this.pool=pool;
+ }
+
+ public List<Lock> getLocks() {
+ return locks;
+ }
+
+ public void run() {
+ Pool.Element element=pool.get();
+ if(element.getLock() != null)
+ locks.add(element.getLock());
+ }
+ }
+}
View
235 tests/other/org/jgroups/tests/UnicastTest.java
@@ -2,15 +2,16 @@
package org.jgroups.tests;
import org.jgroups.*;
-import org.jgroups.stack.Protocol;
import org.jgroups.jmx.JmxConfigurator;
import org.jgroups.protocols.UNICAST;
import org.jgroups.protocols.UNICAST2;
+import org.jgroups.stack.Protocol;
import org.jgroups.util.Util;
-import org.jgroups.util.Streamable;
import javax.management.MBeanServer;
-import java.io.*;
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
@@ -20,34 +21,41 @@
*
* @author Bela Ban
*/
-public class UnicastTest extends ReceiverAdapter {
- private JChannel channel;
- private final MyReceiver receiver=new MyReceiver();
- static final String groupname="UnicastTest-Group";
- private long sleep_time=0;
- private boolean exit_on_end=false, busy_sleep=false, oob=false;
- private int num_threads=1;
- private int num_msgs=100000, msg_size=1000;
+public class UnicastTest {
+ protected JChannel channel;
+ protected final MyReceiver receiver=new MyReceiver();
+ protected long sleep_time=0;
+ protected boolean busy_sleep=false, oob=false, dont_bundle=false;
+ protected int num_threads=1;
+ protected int num_msgs=100000, msg_size=1000;
+
+ protected static final byte START = 1; // | num_msgs (long) |
+ protected static final byte DATA = 2; // | length (int) | data (byte[]) |
+ public void init(Protocol[] props, long sleep_time, boolean busy_sleep, String name) throws Exception {
+ _init(new JChannel(props), sleep_time, busy_sleep, name);
+ }
+ public void init(String props, long sleep_time, boolean busy_sleep, String name) throws Exception {
+ _init(new JChannel(props), sleep_time, busy_sleep, name);
+ }
- public void init(String props, long sleep_time, boolean exit_on_end, boolean busy_sleep, String name) throws Exception {
+ protected void _init(JChannel ch, long sleep_time, boolean busy_sleep, String name) throws Exception {
this.sleep_time=sleep_time;
- this.exit_on_end=exit_on_end;
this.busy_sleep=busy_sleep;
- channel=new JChannel(props);
+ channel=ch;
if(name != null)
channel.setName(name);
- channel.connect(groupname);
+ channel.connect(getClass().getSimpleName());
channel.setReceiver(receiver);
try {
MBeanServer server=Util.getMBeanServer();
- JmxConfigurator.registerChannel(channel, server, "jgroups", channel.getClusterName(), true);
+ JmxConfigurator.registerChannel(channel, server, "jgroups-" + name, channel.getClusterName(), true);
}
catch(Throwable ex) {
- System.err.println("registering the channel in JMX failed: " + ex);
+ System.err.println("registering the channel with JMX failed: " + ex);
}
}
@@ -59,7 +67,7 @@ public void eventLoop() throws Exception {
System.out.print("[1] Send msgs [2] Print view [3] Print conns [4] Trash conn [5] Trash all conns" +
"\n[6] Set sender threads (" + num_threads + ") [7] Set num msgs (" + num_msgs + ") " +
"[8] Set msg size (" + Util.printBytes(msg_size) + ")" +
- "\n[o] Toggle OOB (" + oob + ")\n[q] Quit\n");
+ "\n[o] Toggle OOB (" + oob + ") [b] Toggle dont_bundle (" + dont_bundle + ")\n[q] Quit\n");
System.out.flush();
c=System.in.read();
switch(c) {
@@ -93,6 +101,10 @@ public void eventLoop() throws Exception {
oob=!oob;
System.out.println("oob=" + oob);
break;
+ case 'b':
+ dont_bundle=!dont_bundle;
+ System.out.println("dont_bundle = " + dont_bundle);
+ break;
case 'q':
channel.close();
return;
@@ -102,7 +114,7 @@ public void eventLoop() throws Exception {
}
}
- private void printConnections() {
+ protected void printConnections() {
Protocol prot=channel.getProtocolStack().findProtocol(Util.getUnicastProtocols());
if(prot instanceof UNICAST)
System.out.println(((UNICAST)prot).printConnections());
@@ -110,7 +122,7 @@ else if(prot instanceof UNICAST2)
System.out.println(((UNICAST2)prot).printConnections());
}
- private void removeConnection() {
+ protected void removeConnection() {
Address member=getReceiver();
if(member != null) {
Protocol prot=channel.getProtocolStack().findProtocol(Util.getUnicastProtocols());
@@ -121,7 +133,7 @@ else if(prot instanceof UNICAST2)
}
}
- private void removeAllConnections() {
+ protected void removeAllConnections() {
Protocol prot=channel.getProtocolStack().findProtocol(Util.getUnicastProtocols());
if(prot instanceof UNICAST)
((UNICAST)prot).removeAllConnections();
@@ -144,8 +156,8 @@ void sendMessages() throws Exception {
System.out.println("sending " + num_msgs + " messages (" + Util.printBytes(msg_size) +
") to " + destination + ": oob=" + oob + ", " + num_threads + " sender thread(s)");
- byte[] buf=Util.objectToByteBuffer(new StartData(num_msgs));
- Message msg=new Message(destination, null, buf);
+ ByteBuffer buf=ByteBuffer.allocate(Global.BYTE_SIZE + Global.LONG_SIZE).put(START).putLong(num_msgs);
+ Message msg=new Message(destination, buf.array());
channel.send(msg);
long print=num_msgs / 10;
@@ -188,14 +200,9 @@ void printView() {
- private Address getReceiver() {
- List<Address> mbrs=null;
- int index;
- BufferedReader reader;
- String tmp;
-
+ protected Address getReceiver() {
try {
- mbrs=channel.getView().getMembers();
+ List<Address> mbrs=channel.getView().getMembers();
System.out.println("pick receiver from the following members:");
int i=0;
for(Address mbr: mbrs) {
@@ -207,9 +214,9 @@ private Address getReceiver() {
}
System.out.flush();
System.in.skip(System.in.available());
- reader=new BufferedReader(new InputStreamReader(System.in));
- tmp=reader.readLine().trim();
- index=Integer.parseInt(tmp);
+ BufferedReader reader=new BufferedReader(new InputStreamReader(System.in));
+ String tmp=reader.readLine().trim();
+ int index=Integer.parseInt(tmp);
return mbrs.get(index); // index out of bounds caught below
}
catch(Exception e) {
@@ -221,7 +228,6 @@ private Address getReceiver() {
public static void main(String[] args) {
long sleep_time=0;
- boolean exit_on_end=false;
boolean busy_sleep=false;
String props=null;
String name=null;
@@ -236,10 +242,6 @@ public static void main(String[] args) {
sleep_time=Long.parseLong(args[++i]);
continue;
}
- if("-exit_on_end".equals(args[i])) {
- exit_on_end=true;
- continue;
- }
if("-busy_sleep".equals(args[i])) {
busy_sleep=true;
continue;
@@ -255,7 +257,7 @@ public static void main(String[] args) {
try {
UnicastTest test=new UnicastTest();
- test.init(props, sleep_time, exit_on_end, busy_sleep, name);
+ test.init(props, sleep_time, busy_sleep, name);
test.eventLoop();
}
catch(Exception ex) {
@@ -265,88 +267,31 @@ public static void main(String[] args) {
static void help() {
System.out.println("UnicastTest [-help] [-props <props>] [-sleep <time in ms between msg sends] " +
- "[-exit_on_end] [-busy-sleep] [-name name]");
- }
-
-
- public abstract static class Data implements Streamable {
+ "[-busy-sleep] [-name name]");
}
- public static class StartData extends Data {
- long num_values=0;
- public StartData() {}
-
- StartData(long num_values) {
- this.num_values=num_values;
- }
-
- public void writeTo(DataOutput out) throws Exception {
- out.writeLong(num_values);
- }
-
- public void readFrom(DataInput in) throws Exception {
- num_values=in.readLong();
- }
- }
-
- public static class Value extends Data {
- long value=0;
- byte[] buf=null;
-
- public Value() {
- }
-
- Value(long value, int len) {
- this.value=value;
- if(len > 0)
- this.buf=new byte[len];
- }
-
-
- public void writeTo(DataOutput out) throws Exception {
- out.writeLong(value);
- if(buf != null) {
- out.writeInt(buf.length);
- out.write(buf, 0, buf.length);
- }
- else {
- out.writeInt(0);
- }
- }
-
- public void readFrom(DataInput in) throws Exception {
- value=in.readLong();
- int len=in.readInt();
- if(len > 0) {
- buf=new byte[len];
- in.readFully(buf, 0, len);
- }
- }
- }
-
- private class Sender extends Thread {
- private final int number_of_msgs;
- private final int message_size;
- private final Address destination;
- private final int print;
+ protected class Sender extends Thread {
+ protected final int number_of_msgs;
+ protected final Address destination;
+ protected final int print;
+ protected final byte[] buf;
public Sender(int num_msgs, int msg_size, Address destination, int print) {
this.number_of_msgs=num_msgs;
- this.message_size=msg_size;
this.destination=destination;
this.print=print;
+ buf=ByteBuffer.allocate(Global.INT_SIZE + msg_size).put(DATA).array();
}
public void run() {
for(int i=1; i <= number_of_msgs; i++) {
- Value val=new Value(i, message_size);
- byte[] buf=new byte[0];
try {
- buf=Util.objectToByteBuffer(val);
- Message msg=new Message(destination, null, buf);
+ Message msg=new Message(destination, buf);
if(oob)
msg.setFlag(Message.Flag.OOB);
+ if(dont_bundle)
+ msg.setFlag(Message.Flag.DONT_BUNDLE);
if(i > 0 && print > 0 && i % print == 0)
System.out.println("-- sent " + i);
channel.send(msg);
@@ -361,59 +306,41 @@ public void run() {
}
- private class MyReceiver extends ReceiverAdapter {
- private boolean started=false;
- private long start=0, stop=0;
- private long tmp=0, num_values=0;
- private long total_time=0, msgs_per_sec, print;
- private AtomicLong current_value=new AtomicLong(0), total_bytes=new AtomicLong(0);
+ protected class MyReceiver extends ReceiverAdapter {
+ protected long start=0;
+ protected long print;
+ protected AtomicLong current_value=new AtomicLong(0), total_bytes=new AtomicLong(0);
public void receive(Message msg) {
- Data data;
- try {
- data=(Data)Util.objectFromByteBuffer(msg.getRawBuffer(), msg.getOffset(), msg.getLength());
- }
- catch(Exception e) {
- e.printStackTrace();
- return;
- }
-
- if(data instanceof StartData) {
- if(started) {
- System.err.println("UnicastTest.run(): received START data, but am already processing data");
- }
- else {
- started=true;
- current_value.set(0); // first value to be received
- tmp=0;
- num_values=((StartData)data).num_values;
- print=num_values / 10;
+ byte[] buf=msg.getRawBuffer();
+ byte type=buf[msg.getOffset()];
+
+ switch(type) {
+ case START:
+ ByteBuffer tmp=ByteBuffer.wrap(buf, 1, Global.LONG_SIZE);
+ num_msgs=(int)tmp.getLong();
+ print=num_msgs / 10;
+ current_value.set(0);
total_bytes.set(0);
start=System.currentTimeMillis();
- }
- }
- else
- if(data instanceof Value) {
- tmp=((Value)data).value;
-
- long new_val=current_value.incrementAndGet();
- if(((Value)data).buf != null)
- total_bytes.addAndGet(((Value)data).buf.length);
- if(print > 0 && new_val % print == 0)
- System.out.println("received " + current_value);
- if(new_val >= num_values) {
- stop=System.currentTimeMillis();
- total_time=stop - start;
- msgs_per_sec=(long)(num_values / (total_time / 1000.0));
- double throughput=total_bytes.get() / (total_time / 1000.0);
- System.out.println("-- received " + num_values + " messages (" + Util.printBytes(total_bytes.get()) +
- ") in " + total_time + " ms (" + msgs_per_sec + " messages/sec, " +
- Util.printBytes(throughput) + " / sec)");
- started=false;
- if(exit_on_end)
- System.exit(0);
- }
+ break;
+ case DATA:
+ long new_val=current_value.incrementAndGet();
+ total_bytes.addAndGet(msg.getLength() - Global.INT_SIZE);
+ if(print > 0 && new_val % print == 0)
+ System.out.println("received " + new_val);
+ if(new_val >= num_msgs) {
+ long time=System.currentTimeMillis() - start;
+ double msgs_sec=(current_value.get() / (time / 1000.0));
+ double throughput=total_bytes.get() / (time / 1000.0);
+ System.out.println(String.format("\nreceived %d messages in %d ms (%.2f msgs/sec), throughput=%s",
+ current_value.get(), time, msgs_sec, Util.printBytes(throughput)));
+ break;
+ }
+ break;
+ default:
+ System.err.println("Type " + type + " is invalid");
}
}
View
44 tests/other/org/jgroups/tests/UnicastTestSharedLoopback.java
@@ -0,0 +1,44 @@
+package org.jgroups.tests;
+
+import org.jgroups.protocols.PING;
+import org.jgroups.protocols.SHARED_LOOPBACK;
+import org.jgroups.protocols.UNICAST3;
+import org.jgroups.protocols.pbcast.GMS;
+import org.jgroups.protocols.pbcast.NAKACK2;
+import org.jgroups.protocols.pbcast.STABLE;
+import org.jgroups.stack.Protocol;
+
+/**
+ * Tests UnicastTest with SHARED_LOOPBACK and 2 UnicastTest instances
+ * @author Bela Ban
+ * @since 3.5
+ */
+public class UnicastTestSharedLoopback {
+ public static void main(String[] args) throws Exception {
+
+
+
+ UnicastTest a=new UnicastTest();
+ a.init(props(), 0, false, "A");
+
+ UnicastTest b=new UnicastTest();
+ b.init(props(), 0, false, "B");
+
+
+ a.eventLoop();
+ }
+
+
+ protected static Protocol[] props() {
+ return new Protocol[]{
+ new SHARED_LOOPBACK()
+ .setValue("loopback", false)
+ .setValue("bundler_type", "sender-sends")
+ .setValue("ignore_dont_bundle", false),
+ new PING(),
+ new NAKACK2(),
+ new UNICAST3().setValue("conn_expiry_timeout", 0).setValue("conn_close_timeout", 0),
+ new STABLE(),
+ new GMS()};
+ }
+}
View
327 tests/other/org/jgroups/tests/UnicastTestTcp.java
@@ -0,0 +1,327 @@
+
+package org.jgroups.tests;
+
+import org.jgroups.Global;
+import org.jgroups.Message;
+import org.jgroups.PhysicalAddress;
+import org.jgroups.Version;
+import org.jgroups.stack.IpAddress;
+import org.jgroups.util.DefaultSocketFactory;
+import org.jgroups.util.Util;
+
+import java.io.*;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+
+/**
+ * Same as UnicastTest, but uses pure TCP instead of JGroups
+ *
+ * @author Bela Ban
+ */
+public class UnicastTestTcp {
+ protected boolean oob=false, dont_bundle=true;
+ protected int num_threads=1;
+ protected int num_msgs=100000, msg_size=1000;
+ protected InetSocketAddress local, remote;
+ protected PhysicalAddress destination;
+ protected Socket sock;
+ protected DataOutputStream output;
+ protected final Lock output_lock=new ReentrantLock();
+ protected ServerSocket srv_sock;
+ protected Acceptor acceptor;
+ protected long start=0, stop=0;
+ protected long total_time=0, msgs_per_sec, print;
+ protected AtomicLong current_value=new AtomicLong(0), total_bytes=new AtomicLong(0);
+
+ protected static final boolean TCP_NODELAY=false;
+ protected static final int SOCK_SEND_BUF_SIZE=200 * 1000;
+ protected static final int SOCK_RECV_BUF_SIZE=200 * 1000;
+ protected static final byte START = 1; // | num_msgs (long) |
+ protected static final byte DATA = 2; // | length (int) | data (byte[]) |
+
+
+
+
+ public void init(String local_addr, String remote_addr, int local_port, int remote_port) throws Exception {
+ local=new InetSocketAddress(local_addr, local_port);
+ remote=new InetSocketAddress(remote_addr, remote_port);
+ destination=new IpAddress(remote.getAddress(), remote.getPort());
+ srv_sock=Util.createServerSocket(new DefaultSocketFactory(), "server", local.getAddress(), local.getPort());
+ System.out.println("Listening on " + srv_sock.getLocalSocketAddress());
+ acceptor=new Acceptor();
+ acceptor.start();
+
+ sock=new Socket();
+ //sock.bind(local);
+ sock.setSendBufferSize(SOCK_SEND_BUF_SIZE);
+ sock.setReceiveBufferSize(SOCK_RECV_BUF_SIZE);
+ try {
+ sock.connect(remote);
+ output=new DataOutputStream(new BufferedOutputStream(sock.getOutputStream()));
+ System.out.println("Connected to " + sock.getRemoteSocketAddress());
+ }
+ catch(Throwable t) {
+ System.out.println("Failed connecting to " + remote + ": will only act as server");
+ }
+ }
+
+
+ public void eventLoop() throws Exception {
+ int c;
+
+ while(true) {
+ System.out.print("[1] Send msgs " +
+ "\n[6]Set sender threads (" + num_threads + ") [7] Set num msgs (" + num_msgs + ") " +