Skip to content

Commit

Permalink
- Added Profiler
Browse files Browse the repository at this point in the history
- Removed almost all JVM options from jgroups.sh
- Created transports (UDP, Server, NIO, TCP, JGroups) for RoundTrip
- Replaced LinkedBlockingQueue with ArrayBlockingQueue in regular/oob/internal thread pools
- Added percentiles to AverageMinMax
- Added -use-wall-clock to RoundTrip
- Added BundlerStressTest
- Use G1 GC by default
- Replaced BlockingQueue.poll() in TransferQueueBundler with BlockingQueue.drainTo() (https://issues.jboss.org/browse/JGRP-2076)
- Correct use of CountDownLatch
- Renamed RingBuffer to RingBufferSeqno and RingBufferLockless to RingBufferSeqnoLockless
- First impl of RingBufferBundler
- RingBufferBundler now accesses the RingBuffer directly
- UPerf: targets list if pre-allocated rather than creating a new one on every PUT
- Exposing bundler_wait_strategy and bundler_num_spins in TP. Can be changed at runtime
- RingBufferBundler now uses only 2 iterations through the messages for bundling (instead of 3)
- Added RingBufferBundlerLockless
- Added RingBufferLockless2
- RingBuffer: use Condition.signalAll() instead of signal() to wake up all writers
- Added RingBufferBundlerLockless
- Added PaddedAtomic{Long,Integer,Boolean} to prevent false sharing
- Added size() to Bundler
  • Loading branch information
belaban committed Jul 1, 2016
1 parent 6a87397 commit e75fb59
Show file tree
Hide file tree
Showing 54 changed files with 4,093 additions and 2,394 deletions.
22 changes: 6 additions & 16 deletions bin/jgroups.sh
Expand Up @@ -30,26 +30,16 @@ fi;
#JG_FLAGS="-Djgroups.bind_addr=match-address:192.168.1.*" #JG_FLAGS="-Djgroups.bind_addr=match-address:192.168.1.*"
JG_FLAGS="$JG_FLAGS -Djava.net.preferIPv4Stack=true" JG_FLAGS="$JG_FLAGS -Djava.net.preferIPv4Stack=true"
FLAGS="-server -Xmx1G -Xms500M" FLAGS="-server -Xmx1G -Xms500M"
FLAGS="$FLAGS -XX:CompileThreshold=10000 -XX:ThreadStackSize=64K -XX:SurvivorRatio=8" #GC="-XX:+UseParNewGC -XX:+UseConcMarkSweepGC" ## concurrent mark and sweep (CMS) collector
FLAGS="$FLAGS -XX:TargetSurvivorRatio=90 -XX:MaxTenuringThreshold=15" #GC="-XX:+UseG1GC -XX:MaxGCPauseMillis=200"
FLAGS="$FLAGS -Xshare:off" #GC="-XX:+UseG1GC -XX:+UseStringDeduplication"
# FLAGS="$FLAGS -XX:+UseStringDeduplication" ## JDK 8u20 GC="-XX:+UseG1GC"
#GC="-XX:+UseG1GC" ## use at least JDK 8
GC="-XX:+UseParNewGC -XX:+UseConcMarkSweepGC" ## concurrent mark and sweep (CMS) collector


JMX="-Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost" JMX="-Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost"
#EXPERIMENTAL="-XX:+UseFastAccessorMethods -XX:+UseTLAB"


#EXPERIMENTAL="$EXPERIMENTAL -XX:+DoEscapeAnalysis -XX:+EliminateLocks -XX:+UseBiasedLocking" #java -Xrunhprof:cpu=samples,monitor=y,interval=5,lineno=y,thread=y -classpath $CP $LOG $JG_FLAGS $FLAGS $JMX $*
EXPERIMENTAL="$EXPERIMENTAL -XX:+EliminateLocks -XX:+UseBiasedLocking"

#EXPERIMENTAL="$EXPERIMENTAL -XX:+AggressiveOpts -XX:+DoEscapeAnalysis -XX:+EliminateLocks -XX:+UseBiasedLocking -XX:+UseCompressedOops"
#EXPERIMENTAL="$EXPERIMENTAL -XX:+UnlockExperimentalVMOptions -XX:+UseG1GC"

#java -Xrunhprof:cpu=samples,monitor=y,interval=5,lineno=y,thread=y -classpath $CP $LOG $JG_FLAGS $FLAGS $EXPERIMENTAL $JMX $*


#DEBUG="-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5000" #DEBUG="-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5000"
#JMC="-XX:+UnlockCommercialFeatures -XX:+FlightRecorder"


java -cp $CP $DEBUG $LOG $GC $JG_FLAGS $FLAGS $EXPERIMENTAL $JMX $JMC $* java -verbose:gc -cp $CP $DEBUG $LOG $GC $JG_FLAGS $FLAGS $JMX $JMC $*


Expand Up @@ -3,7 +3,7 @@
# an element more than once # an element more than once


RULE Remove1 RULE Remove1
CLASS RingBufferLockless CLASS RingBufferSeqnoLockless
METHOD remove(boolean) METHOD remove(boolean)
AT WRITE hd AT WRITE hd
IF true IF true
Expand All @@ -13,7 +13,7 @@ DO System.out.println("---> Remover: waiting on \"remove\"");
ENDRULE ENDRULE


RULE Remove2 RULE Remove2
CLASS RingBufferLockless CLASS RingBufferSeqnoLockless
METHOD remove(boolean) METHOD remove(boolean)
AT EXIT AT EXIT
IF true IF true
Expand All @@ -22,7 +22,7 @@ DO System.out.println("---> Remover: signalling \"add\"");
ENDRULE ENDRULE


RULE Add RULE Add
CLASS RingBufferLockless CLASS RingBufferSeqnoLockless
METHOD add(long,java.lang.Object,boolean) METHOD add(long,java.lang.Object,boolean)
AFTER INVOKE RingBuffer.index(long) AFTER INVOKE RingBuffer.index(long)
IF !createCountDown("tmp", 4) && countDown("tmp") IF !createCountDown("tmp", 4) && countDown("tmp")
Expand Down
Expand Up @@ -6,7 +6,7 @@
# Creates a rendezvous for 7 threads (6 Adders and 1 Remover) # Creates a rendezvous for 7 threads (6 Adders and 1 Remover)
# #
RULE Create rendezvous RULE Create rendezvous
CLASS RingBufferLockless CLASS RingBufferSeqnoLockless
METHOD <init> METHOD <init>
IF TRUE IF TRUE
DO createRendezvous($0, 7, true); DO createRendezvous($0, 7, true);
Expand All @@ -18,7 +18,7 @@ ENDRULE
# removed elements, and then rendezvous again, allowing the Adders to set the elements again # removed elements, and then rendezvous again, allowing the Adders to set the elements again
# #
RULE Remove1 RULE Remove1
CLASS RingBufferLockless CLASS RingBufferSeqnoLockless
METHOD removeMany(java.util.concurrent.atomic.AtomicBoolean,boolean,int) METHOD removeMany(java.util.concurrent.atomic.AtomicBoolean,boolean,int)
AT WRITE hd AT WRITE hd
IF $3 == 0 IF $3 == 0
Expand All @@ -30,7 +30,7 @@ ENDRULE
# the rendezvous again, allowing the Remover to null all elements, before the Adders are unblocked again. # the rendezvous again, allowing the Remover to null all elements, before the Adders are unblocked again.
# #
RULE Add RULE Add
CLASS RingBufferLockless CLASS RingBufferSeqnoLockless
METHOD add(long,java.lang.Object,boolean) METHOD add(long,java.lang.Object,boolean)
AFTER READ hd AFTER READ hd
IF $3 IF $3
Expand All @@ -44,7 +44,7 @@ ENDRULE
# (scenario #1) # (scenario #1)
# #
RULE Remove2 RULE Remove2
CLASS RingBufferLockless CLASS RingBufferSeqnoLockless
METHOD removeMany(boolean,int) METHOD removeMany(boolean,int)
AT EXIT AT EXIT
IF $2 == 0 IF $2 == 0
Expand Down
File renamed without changes.
13 changes: 6 additions & 7 deletions src/org/jgroups/JChannel.java
Expand Up @@ -359,8 +359,7 @@ public synchronized JChannel connect(String cluster_name) throws Exception {
protected synchronized JChannel connect(String cluster_name, boolean useFlushIfPresent) throws Exception { protected synchronized JChannel connect(String cluster_name, boolean useFlushIfPresent) throws Exception {
if(!_preConnect(cluster_name)) if(!_preConnect(cluster_name))
return this; return this;
Event connect_event=useFlushIfPresent? new Event(Event.CONNECT_USE_FLUSH, cluster_name) Event connect_event=new Event(useFlushIfPresent? Event.CONNECT_USE_FLUSH : Event.CONNECT, cluster_name);
: new Event(Event.CONNECT, cluster_name);
_connect(connect_event); _connect(connect_event);
state=State.CONNECTED; state=State.CONNECTED;
notifyChannelConnected(this); notifyChannelConnected(this);
Expand Down Expand Up @@ -407,8 +406,7 @@ public synchronized JChannel connect(String cluster_name, Address target, long t


boolean canFetchState=false; boolean canFetchState=false;
try { try {
Event connect_event=useFlushIfPresent? new Event(Event.CONNECT_WITH_STATE_TRANSFER_USE_FLUSH, cluster_name) Event connect_event=new Event(useFlushIfPresent? Event.CONNECT_WITH_STATE_TRANSFER_USE_FLUSH : Event.CONNECT_WITH_STATE_TRANSFER, cluster_name);
: new Event(Event.CONNECT_WITH_STATE_TRANSFER, cluster_name);
_connect(connect_event); _connect(connect_event);
state=State.CONNECTED; state=State.CONNECTED;
notifyChannelConnected(this); notifyChannelConnected(this);
Expand Down Expand Up @@ -482,9 +480,9 @@ public synchronized void close() {
* @exception IllegalStateException thrown if the channel is disconnected or closed * @exception IllegalStateException thrown if the channel is disconnected or closed
*/ */
public JChannel send(Message msg) throws Exception { public JChannel send(Message msg) throws Exception {
checkClosedOrNotConnected();
if(msg == null) if(msg == null)
throw new NullPointerException("msg is null"); throw new NullPointerException("msg is null");
checkClosedOrNotConnected();
down(new Event(Event.MSG, msg)); down(new Event(Event.MSG, msg));
return this; return this;
} }
Expand Down Expand Up @@ -1037,9 +1035,10 @@ protected JChannel checkClosed() {
} }


protected JChannel checkClosedOrNotConnected() { protected JChannel checkClosedOrNotConnected() {
if(state == State.CLOSED) State tmp=state;
if(tmp == State.CLOSED)
throw new IllegalStateException("channel is closed"); throw new IllegalStateException("channel is closed");
if(!(state == State.CONNECTING || state == State.CONNECTED)) if(!(tmp == State.CONNECTING || tmp == State.CONNECTED))
throw new IllegalStateException("channel is disconnected"); throw new IllegalStateException("channel is disconnected");
return this; return this;
} }
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/JChannelProbeHandler.java
Expand Up @@ -99,7 +99,7 @@ protected void handleJmx(Map<String, String> map, String input) {
Class<?> type=setter instanceof ResourceDMBean.FieldAccessor? Class<?> type=setter instanceof ResourceDMBean.FieldAccessor?
((ResourceDMBean.FieldAccessor)setter).getField().getType() : ((ResourceDMBean.FieldAccessor)setter).getField().getType() :
setter instanceof ResourceDMBean.MethodAccessor? setter instanceof ResourceDMBean.MethodAccessor?
((ResourceDMBean.MethodAccessor)setter).getMethod().getParameterTypes()[0].getClass() : null; ((ResourceDMBean.MethodAccessor)setter).getMethod().getParameterTypes()[0] : null;
Object converted_value=Util.convert(attrvalue, type); Object converted_value=Util.convert(attrvalue, type);
setter.invoke(converted_value); setter.invoke(converted_value);
} }
Expand Down
21 changes: 10 additions & 11 deletions src/org/jgroups/protocols/BaseBundler.java
Expand Up @@ -54,21 +54,30 @@ public void viewChange(View view) {
} }
} }


public int size() {
int num=0;
Collection<List<Message>> values=msgs.values();
for(List<Message> list: values)
num+=list.size();
return num;
}

/** /**
* Sends all messages in the map. Messages for the same destination are bundled into a message list. * Sends all messages in the map. Messages for the same destination are bundled into a message list.
* The map will be cleared when done. * The map will be cleared when done.
*/ */
protected void sendBundledMessages() { protected void sendBundledMessages() {
if(log.isTraceEnabled()) { if(log.isTraceEnabled()) {
double percentage=100.0 / transport.getMaxBundleSize() * count; double percentage=100.0 / transport.getMaxBundleSize() * count;
log.trace(BUNDLE_MSG, transport.localAddress(), numMessages(), count, percentage, msgs.size(), msgs.keySet()); log.trace(BUNDLE_MSG, transport.localAddress(), size(), count, percentage, msgs.size(), msgs.keySet());
} }


for(Map.Entry<Address,List<Message>> entry: msgs.entrySet()) { for(Map.Entry<Address,List<Message>> entry: msgs.entrySet()) {
List<Message> list=entry.getValue(); List<Message> list=entry.getValue();
if(list.isEmpty()) if(list.isEmpty())
continue; continue;


output.position(0);
if(list.size() == 1) if(list.size() == 1)
sendSingleMessage(list.get(0)); sendSingleMessage(list.get(0));
else { else {
Expand All @@ -86,19 +95,10 @@ protected void sendBundledMessages() {
msgs.values().stream().filter(list -> list != null).forEach(List::clear); msgs.values().stream().filter(list -> list != null).forEach(List::clear);
} }


protected int numMessages() {
int num=0;
Collection<List<Message>> values=msgs.values();
for(List<Message> list: values)
num+=list.size();
return num;
}



protected void sendSingleMessage(final Message msg) { protected void sendSingleMessage(final Message msg) {
Address dest=msg.getDest(); Address dest=msg.getDest();
try { try {
output.position(0);
Util.writeMessage(msg, output, dest == null); Util.writeMessage(msg, output, dest == null);
transport.doSend(output.buffer(), 0, output.position(), dest); transport.doSend(output.buffer(), 0, output.position(), dest);
if(transport.statsEnabled()) if(transport.statsEnabled())
Expand All @@ -118,7 +118,6 @@ protected void sendSingleMessage(final Message msg) {


protected void sendMessageList(final Address dest, final Address src, final List<Message> list) { protected void sendMessageList(final Address dest, final Address src, final List<Message> list) {
try { try {
output.position(0);
Util.writeMessageList(dest, src, transport.cluster_name.chars(), list, output, dest == null, transport.getId()); Util.writeMessageList(dest, src, transport.cluster_name.chars(), list, output, dest == null, transport.getId());
transport.doSend(output.buffer(), 0, output.position(), dest); transport.doSend(output.buffer(), 0, output.position(), dest);
} }
Expand Down
3 changes: 3 additions & 0 deletions src/org/jgroups/protocols/Bundler.java
Expand Up @@ -20,4 +20,7 @@ default void init(@SuppressWarnings("UnusedParameters") TP transport) {}
void send(Message msg) throws Exception; void send(Message msg) throws Exception;
@SuppressWarnings("UnusedParameters") @SuppressWarnings("UnusedParameters")
default void viewChange(View view) {} default void viewChange(View view) {}

/** The number of unsent messages in the bundler */
int size();
} }
60 changes: 43 additions & 17 deletions src/org/jgroups/protocols/NoBundler.java
Expand Up @@ -7,37 +7,64 @@
import org.jgroups.util.Util; import org.jgroups.util.Util;


import java.net.SocketException; import java.net.SocketException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.BlockingQueue;

import java.util.stream.IntStream;
import static org.jgroups.protocols.TP.MSG_OVERHEAD;


/** /**
* Bundler which doesn't bundle :-) Can be used to measure the diff between bundling and non-bundling (e.g. at runtime) * Bundler which doesn't bundle :-) Can be used to measure the diff between bundling and non-bundling (e.g. at runtime)
* @author Bela Ban * @author Bela Ban
* @since 4.0 * @since 4.0
*/ */
public class NoBundler implements Bundler { public class NoBundler implements Bundler {
protected TP transport; protected TP transport;
protected final ReentrantLock lock=new ReentrantLock(); protected Log log;
protected Log log; protected int pool_size=10;
protected final ByteArrayDataOutputStream output=new ByteArrayDataOutputStream(1024); protected BlockingQueue<ByteArrayDataOutputStream> buf_pool;
protected int initial_buf_size=512;

// protected final Profiler send=new Profiler("nb.send", TimeUnit.MICROSECONDS);


public int size() {return 0;}
public int initialBufSize() {return initial_buf_size;}
public NoBundler initialBufSize(int s) {this.initial_buf_size=s; return this;}
public int poolSize() {return pool_size;}


public NoBundler poolSize(int s) {
if(s == pool_size) return this;
pool_size=s;
BlockingQueue<ByteArrayDataOutputStream> new_pool=new ArrayBlockingQueue<>(pool_size);
BlockingQueue<ByteArrayDataOutputStream> tmp=buf_pool;
buf_pool=new_pool;
if(tmp != null)
tmp.clear();
return this;
}


public void init(TP transport) { public void init(TP transport) {
this.transport=transport; this.transport=transport;
log=transport.getLog(); log=transport.getLog();
buf_pool=new ArrayBlockingQueue<>(pool_size);
IntStream.rangeClosed(1, pool_size).forEach(ignored -> buf_pool.offer(new ByteArrayDataOutputStream(initial_buf_size)));
// transport.registerProbeHandler(send);
} }
public void start() {} public void start() {}
public void stop() {} public void stop() {}


public void send(Message msg) throws Exception { public void send(Message msg) throws Exception {
lock.lock(); ByteArrayDataOutputStream out=null;
try { try {
sendSingleMessage(msg, output != null? output : new ByteArrayDataOutputStream((int)(msg.size() + MSG_OVERHEAD))); out=buf_pool.poll();
if(out == null) {
out=new ByteArrayDataOutputStream(initial_buf_size);
log.warn("created new output buffer as pool was empty");
}
sendSingleMessage(msg, out);
} }
finally { finally {
lock.unlock(); if(out != null)
buf_pool.offer(out);
} }
} }


Expand All @@ -47,9 +74,14 @@ protected void sendSingleMessage(final Message msg, final ByteArrayDataOutputStr
try { try {
output.position(0); output.position(0);
Util.writeMessage(msg, output, dest == null); Util.writeMessage(msg, output, dest == null);
//long start=Util.micros();
transport.doSend(output.buffer(), 0, output.position(), dest); transport.doSend(output.buffer(), 0, output.position(), dest);
//long time_us=Util.micros()-start;
if(transport.statsEnabled()) if(transport.statsEnabled())
transport.incrSingleMsgsInsteadOfBatches(); transport.incrSingleMsgsInsteadOfBatches();
//synchronized(send) {
// send.add(time_us);
//}
} }
catch(SocketException sock_ex) { catch(SocketException sock_ex) {
log.trace(Util.getMessage("SendFailure"), log.trace(Util.getMessage("SendFailure"),
Expand All @@ -61,10 +93,4 @@ protected void sendSingleMessage(final Message msg, final ByteArrayDataOutputStr
} }
} }



protected static class Output {
protected final AtomicBoolean available=new AtomicBoolean(true);
protected final ByteArrayDataOutputStream output=new ByteArrayDataOutputStream(1024);
}

} }

0 comments on commit e75fb59

Please sign in to comment.