Skip to content

Commit

Permalink
TcpConnection: only the last writer flushes the output stream
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Feb 6, 2017
1 parent 80a3733 commit 5c6023c
Showing 1 changed file with 14 additions and 9 deletions.
23 changes: 14 additions & 9 deletions src/org/jgroups/blocks/cs/TcpConnection.java
Expand Up @@ -11,6 +11,7 @@
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;

/**
Expand All @@ -26,6 +27,7 @@ public class TcpConnection extends Connection {
protected DataInputStream in;
protected volatile Receiver receiver;
protected final TcpBaseServer server;
protected final AtomicInteger writers=new AtomicInteger(0); // to determine the last writer to flush

/** Creates a connection stub and binds it, use {@link #connect(Address)} to connect */
public TcpConnection(Address peer_addr, TcpBaseServer server) throws Exception {
Expand Down Expand Up @@ -117,15 +119,20 @@ public void start() {
* @param length
*/
public void send(byte[] data, int offset, int length) throws Exception {
if(out == null)
return;
writers.incrementAndGet();
send_lock.lock();
try {
doSend(data, offset, length, true, true);
doSend(data, offset, length);
updateLastAccessed();
}
catch(InterruptedException iex) {
Thread.currentThread().interrupt(); // set interrupt flag again
}
finally {
if(writers.decrementAndGet() == 0) // only the last active writer thread calls flush()
flush(); // won't throw an exception
send_lock.unlock();
}
}
Expand All @@ -145,19 +152,17 @@ public void send(ByteBuffer buf) throws Exception {
}


protected void doSend(byte[] data, int offset, int length, boolean acquire_lock, boolean flush) throws Exception {
if(out == null)
return;
protected void doSend(byte[] data, int offset, int length) throws Exception {
out.writeInt(length); // write the length of the data buffer first
out.write(data,offset,length);
if(!flush || (acquire_lock && send_lock.hasQueuedThreads()))
return; // don't flush as some of the waiting threads will do the flush, or flush is false
out.flush(); // may not be very efficient (but safe)
}

protected void flush() throws Exception {
if(out != null)
protected void flush() {
try {
out.flush();
}
catch(Throwable t) {
}
}

protected BufferedOutputStream createBufferedOutputStream(OutputStream out) {
Expand Down

0 comments on commit 5c6023c

Please sign in to comment.