Skip to content

Commit

Permalink
First (untested) pass at moving APR writes to SocketWrapper
Browse files Browse the repository at this point in the history
git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@1650271 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
markt-asf committed Jan 8, 2015
1 parent 5a6f780 commit 6881e17
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 289 deletions.
161 changes: 8 additions & 153 deletions java/org/apache/coyote/http11/InternalAprOutputBuffer.java
Expand Up @@ -19,14 +19,9 @@


import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;


import org.apache.coyote.Response; import org.apache.coyote.Response;
import org.apache.tomcat.jni.Socket; import org.apache.tomcat.jni.Socket;
import org.apache.tomcat.jni.Status;
import org.apache.tomcat.util.buf.ByteBufferHolder;
import org.apache.tomcat.util.net.AbstractEndpoint; import org.apache.tomcat.util.net.AbstractEndpoint;
import org.apache.tomcat.util.net.AprEndpoint; import org.apache.tomcat.util.net.AprEndpoint;
import org.apache.tomcat.util.net.SocketWrapperBase; import org.apache.tomcat.util.net.SocketWrapperBase;
Expand Down Expand Up @@ -76,6 +71,7 @@ public void init(SocketWrapperBase<Long> socketWrapper) {
this.endpoint = socketWrapper.getEndpoint(); this.endpoint = socketWrapper.getEndpoint();


Socket.setsbb(this.socket, socketWriteBuffer); Socket.setsbb(this.socket, socketWriteBuffer);
socketWrapper.socketWriteBuffer = socketWriteBuffer;
} }




Expand All @@ -99,166 +95,25 @@ public void recycle() {
@Override @Override
public void sendAck() throws IOException { public void sendAck() throws IOException {
if (!committed) { if (!committed) {
if (Socket.send(socket, Constants.ACK_BYTES, 0, Constants.ACK_BYTES.length) < 0) addToBB(Constants.ACK_BYTES, 0, Constants.ACK_BYTES.length);
if (flushBuffer(true)) {
throw new IOException(sm.getString("iob.failedwrite.ack")); throw new IOException(sm.getString("iob.failedwrite.ack"));
}
}


// ------------------------------------------------------ Protected Methods

@Override
protected synchronized void addToBB(byte[] buf, int offset, int length)
throws IOException {

if (length == 0) return;

// If bbuf is currently being used for writes, add this data to the
// write buffer
if (writeBufferFlipped) {
addToBuffers(buf, offset, length);
return;
}

// Keep writing until all the data is written or a non-blocking write
// leaves data in the buffer
while (length > 0) {
int thisTime = length;
if (socketWriteBuffer.position() == socketWriteBuffer.capacity()) {
if (flushBuffer(isBlocking())) {
break;
}
}
if (thisTime > socketWriteBuffer.capacity() - socketWriteBuffer.position()) {
thisTime = socketWriteBuffer.capacity() - socketWriteBuffer.position();
} }
socketWriteBuffer.put(buf, offset, thisTime);
length = length - thisTime;
offset = offset + thisTime;
}

if (!isBlocking() && length>0) {
// Buffer the remaining data
addToBuffers(buf, offset, length);
} }
} }




private void addToBuffers(byte[] buf, int offset, int length) { // ------------------------------------------------------ Protected Methods
ByteBufferHolder holder = bufferedWrites.peekLast();
if (holder==null || holder.isFlipped() || holder.getBuf().remaining()<length) {
ByteBuffer buffer = ByteBuffer.allocate(Math.max(bufferedWriteSize,length));
holder = new ByteBufferHolder(buffer,false);
bufferedWrites.add(holder);
}
holder.getBuf().put(buf,offset,length);
}



@Override @Override
protected synchronized boolean flushBuffer(boolean block) protected synchronized void addToBB(byte[] buf, int offset, int length) throws IOException {
throws IOException { socketWrapper.write(isBlocking(), buf, offset, length);

if (hasMoreDataToFlush()) {
writeToSocket(block);
}

if (bufferedWrites.size() > 0) {
Iterator<ByteBufferHolder> bufIter = bufferedWrites.iterator();
while (!hasMoreDataToFlush() && bufIter.hasNext()) {
ByteBufferHolder buffer = bufIter.next();
buffer.flip();
while (!hasMoreDataToFlush() && buffer.getBuf().remaining()>0) {
transfer(buffer.getBuf(), socketWriteBuffer);
if (buffer.getBuf().remaining() == 0) {
bufIter.remove();
}
writeToSocket(block);
//here we must break if we didn't finish the write
}
}
}

return hasMoreDataToFlush();
} }




private synchronized void writeToSocket(boolean block) throws IOException {

Lock readLock = socketWrapper.getBlockingStatusReadLock();
WriteLock writeLock = socketWrapper.getBlockingStatusWriteLock();

readLock.lock();
try {
if (socketWrapper.getBlockingStatus() == block) {
writeToSocket();
return;
}
} finally {
readLock.unlock();
}

writeLock.lock();
try {
// Set the current settings for this socket
socketWrapper.setBlockingStatus(block);
if (block) {
Socket.timeoutSet(socket, endpoint.getSoTimeout() * 1000);
} else {
Socket.timeoutSet(socket, 0);
}

// Downgrade the lock
readLock.lock();
try {
writeLock.unlock();
writeToSocket();
} finally {
readLock.unlock();
}
} finally {
// Should have been released above but may not have been on some
// exception paths
if (writeLock.isHeldByCurrentThread()) {
writeLock.unlock();
}
}
}

private synchronized void writeToSocket() throws IOException {
if (!writeBufferFlipped) {
writeBufferFlipped = true;
socketWriteBuffer.flip();
}

int written;

do {
written = Socket.sendbb(socket, socketWriteBuffer.position(), socketWriteBuffer.remaining());
if (Status.APR_STATUS_IS_EAGAIN(-written)) {
written = 0;
} else if (written < 0) {
throw new IOException("APR error: " + written);
}
socketWriteBuffer.position(socketWriteBuffer.position() + written);
} while (written > 0 && socketWriteBuffer.hasRemaining());

if (socketWriteBuffer.remaining() == 0) {
socketWriteBuffer.clear();
writeBufferFlipped = false;
}
// If there is data left in the buffer the socket will be registered for
// write further up the stack. This is to ensure the socket is only
// registered for write once as both container and user code can trigger
// write registration.
}


//-------------------------------------------------- Non-blocking IO methods

@Override @Override
protected synchronized boolean hasMoreDataToFlush() { protected boolean flushBuffer(boolean block) throws IOException {
return super.hasMoreDataToFlush(); return socketWrapper.flush(block);
} }




Expand Down
85 changes: 41 additions & 44 deletions java/org/apache/tomcat/util/net/AprEndpoint.java
Expand Up @@ -2505,11 +2505,8 @@ public void close() {




@Override @Override
public void write(boolean block, byte[] b, int off, int len) throws IOException { protected int doWrite(ByteBuffer bytebuffer, boolean block, boolean flip)
doWrite(block, b, off, len); throws IOException {
}

private void doWrite(boolean block, byte[] b, int off, int len) throws IOException {
if (closed) { if (closed) {
throw new IOException(sm.getString("apr.closed", getSocket())); throw new IOException(sm.getString("apr.closed", getSocket()));
} }
Expand All @@ -2520,8 +2517,7 @@ private void doWrite(boolean block, byte[] b, int off, int len) throws IOExcepti
readLock.lock(); readLock.lock();
try { try {
if (getBlockingStatus() == block) { if (getBlockingStatus() == block) {
doWriteInternal(b, off, len); return doWriteInternal(bytebuffer, flip);
return;
} }
} finally { } finally {
readLock.unlock(); readLock.unlock();
Expand All @@ -2541,8 +2537,7 @@ private void doWrite(boolean block, byte[] b, int off, int len) throws IOExcepti
readLock.lock(); readLock.lock();
try { try {
writeLock.unlock(); writeLock.unlock();
doWriteInternal(b, off, len); return doWriteInternal(bytebuffer, flip);
return;
} finally { } finally {
readLock.unlock(); readLock.unlock();
} }
Expand All @@ -2556,57 +2551,66 @@ private void doWrite(boolean block, byte[] b, int off, int len) throws IOExcepti
} }




private int doWriteInternal(byte[] b, int off, int len) throws IOException { private int doWriteInternal(ByteBuffer bytebuffer, boolean flip)
throws IOException {
if (flip) {
bytebuffer.flip();
writeBufferFlipped = true;
}


int start = off; int written = 0;
int left = len; int thisTime;
int written;


do { do {
thisTime = 0;
if (getEndpoint().isSSLEnabled()) { if (getEndpoint().isSSLEnabled()) {
if (sslOutputBuffer.remaining() == 0) { if (sslOutputBuffer.remaining() == 0) {
// Buffer was fully written last time around // Buffer was fully written last time around
sslOutputBuffer.clear(); sslOutputBuffer.clear();
if (left < SSL_OUTPUT_BUFFER_SIZE) { transfer(bytebuffer, sslOutputBuffer);
sslOutputBuffer.put(b, start, left);
} else {
sslOutputBuffer.put(b, start, SSL_OUTPUT_BUFFER_SIZE);
}
sslOutputBuffer.flip(); sslOutputBuffer.flip();
thisTime = sslOutputBuffer.remaining();
} else { } else {
// Buffer still has data from previous attempt to write // Buffer still has data from previous attempt to write
// APR + SSL requires that exactly the same parameters are // APR + SSL requires that exactly the same parameters are
// passed when re-attempting the write // passed when re-attempting the write
} }
written = Socket.sendb(getSocket().longValue(), sslOutputBuffer, int sslWritten = Socket.sendb(getSocket().longValue(), sslOutputBuffer,
sslOutputBuffer.position(), sslOutputBuffer.limit()); sslOutputBuffer.position(), sslOutputBuffer.limit());
if (written > 0) { if (sslWritten > 0) {
sslOutputBuffer.position( sslOutputBuffer.position(
sslOutputBuffer.position() + written); sslOutputBuffer.position() + sslWritten);
} }
} else { } else {
written = Socket.send(getSocket().longValue(), b, start, left); thisTime = Socket.sendb(getSocket().longValue(), bytebuffer,
bytebuffer.position(), bytebuffer.limit() - bytebuffer.position());
} }
if (Status.APR_STATUS_IS_EAGAIN(-written)) { if (Status.APR_STATUS_IS_EAGAIN(-thisTime)) {
written = 0; thisTime = 0;
} else if (-written == Status.APR_EOF) { } else if (-thisTime == Status.APR_EOF) {
throw new EOFException(sm.getString("apr.clientAbort")); throw new EOFException(sm.getString("socket.apr.clientAbort"));
} else if ((OS.IS_WIN32 || OS.IS_WIN64) && } else if ((OS.IS_WIN32 || OS.IS_WIN64) &&
(-written == Status.APR_OS_START_SYSERR + 10053)) { (-thisTime == Status.APR_OS_START_SYSERR + 10053)) {
// 10053 on Windows is connection aborted // 10053 on Windows is connection aborted
throw new EOFException(sm.getString("apr.clientAbort")); throw new EOFException(sm.getString("socket.apr.clientAbort"));
} else if (written < 0) { } else if (thisTime < 0) {
throw new IOException(sm.getString("apr.write.error", throw new IOException(sm.getString("socket.apr.write.error",
Integer.valueOf(-written), getSocket(), this)); Integer.valueOf(-thisTime), getSocket(), this));
} }
start += written; written += thisTime;
left -= written; bytebuffer.position(bytebuffer.position() + thisTime);
} while (written > 0 && left > 0); } while (thisTime > 0 && bytebuffer.hasRemaining());


if (left > 0) { if (bytebuffer.remaining() == 0) {
((AprEndpoint) getEndpoint()).getPoller().add(getSocket().longValue(), -1, false, true); bytebuffer.clear();
writeBufferFlipped = false;
} }
return len - left; // If there is data left in the buffer the socket will be registered for
// write further up the stack. This is to ensure the socket is only
// registered for write once as both container and user code can trigger
// write registration.

return written;
} }




Expand All @@ -2615,12 +2619,5 @@ public void regsiterForEvent(boolean read, boolean write) {
((AprEndpoint) getEndpoint()).getPoller().add( ((AprEndpoint) getEndpoint()).getPoller().add(
getSocket().longValue(), -1, read, write); getSocket().longValue(), -1, read, write);
} }


@Override
public boolean flush(boolean block) throws IOException {
// TODO Auto-generated method stub
return false;
}
} }
} }
7 changes: 7 additions & 0 deletions java/org/apache/tomcat/util/net/Nio2Endpoint.java
Expand Up @@ -1025,6 +1025,13 @@ public void write(boolean block, byte[] b, int off, int len) throws IOException
} }




@Override
protected int doWrite(ByteBuffer buffer, boolean block, boolean flip)
throws IOException {
// TODO Auto-generated method stub
return 0;
}

private int writeInternal(boolean block, byte[] b, int off, int len) private int writeInternal(boolean block, byte[] b, int off, int len)
throws IOException { throws IOException {
ByteBuffer writeBuffer = getSocket().getBufHandler().getWriteBuffer(); ByteBuffer writeBuffer = getSocket().getBufHandler().getWriteBuffer();
Expand Down

0 comments on commit 6881e17

Please sign in to comment.