Skip to content

Commit

Permalink
Move writes and associated buffers to SocketWrapper for NIO. NIO2/APR…
Browse files Browse the repository at this point in the history
… likely broken at this point.

git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@1650269 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
markt-asf committed Jan 8, 2015
1 parent b7bc74c commit 5dfa1df
Show file tree
Hide file tree
Showing 9 changed files with 230 additions and 373 deletions.
132 changes: 12 additions & 120 deletions java/org/apache/coyote/ajp/AjpProcessor.java
Expand Up @@ -24,8 +24,6 @@
import java.security.NoSuchProviderException;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;
import java.util.Iterator;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.servlet.RequestDispatcher;
Expand All @@ -44,7 +42,6 @@
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import org.apache.tomcat.util.ExceptionUtils;
import org.apache.tomcat.util.buf.ByteBufferHolder;
import org.apache.tomcat.util.buf.ByteChunk;
import org.apache.tomcat.util.buf.HexUtils;
import org.apache.tomcat.util.buf.MessageBytes;
Expand Down Expand Up @@ -182,22 +179,6 @@ public class AjpProcessor<S> extends AbstractProcessor<S> {
protected final MessageBytes bodyBytes = MessageBytes.newInstance();


/**
* The max size of the buffered write buffer
*/
private int bufferedWriteSize = 64*1024; //64k default write buffer


/**
* For "non-blocking" writes use an external set of buffers. Although the
* API only allows one non-blocking write at a time, due to buffering and
* the possible need to write HTTP headers, there may be more than one write
* to the OutputBuffer.
*/
private final LinkedBlockingDeque<ByteBufferHolder> bufferedWrites =
new LinkedBlockingDeque<>();


/**
* Host name (used to avoid useless B2C conversion on the host name).
*/
Expand Down Expand Up @@ -605,7 +586,7 @@ public final void action(ActionCode actionCode, Object param) {
}
case NB_WRITE_INTEREST: {
AtomicBoolean isReady = (AtomicBoolean)param;
boolean result = bufferedWrites.size() == 0 && responseMsgPos == -1;
boolean result = !socketWrapper.hasDataToWrite() && responseMsgPos == -1;
isReady.set(result);
if (!result) {
registerForEvent(false, true);
Expand Down Expand Up @@ -647,7 +628,8 @@ public SocketState asyncDispatch(SocketStatus status) {
asyncStateMachine.asyncOperation();
try {
if (hasDataToWrite()) {
flushBufferedData();
boolean blocking = (response.getWriteListener() == null);
socketWrapper.flush(blocking);
if (hasDataToWrite()) {
// There is data to write but go via Response to
// maintain a consistent view of non-blocking state
Expand Down Expand Up @@ -755,7 +737,7 @@ public SocketState process(SocketWrapperBase<S> socket) throws IOException {
}
cping = true;
try {
output(pongMessageArray, 0, pongMessageArray.length, true);
socketWrapper.write(true, pongMessageArray, 0, pongMessageArray.length);
} catch (IOException e) {
setErrorState(ErrorState.CLOSE_NOW, e);
}
Expand Down Expand Up @@ -1053,7 +1035,7 @@ protected boolean refillReadBuffer(boolean block) throws IOException {

// Request more data immediately
if (!waitingForBodyMessage) {
output(getBodyMessageArray, 0, getBodyMessageArray.length, true);
socketWrapper.write(true, getBodyMessageArray, 0, getBodyMessageArray.length);
waitingForBodyMessage = true;
}

Expand Down Expand Up @@ -1460,7 +1442,7 @@ protected void prepareResponse() throws IOException {

// Write to buffer
responseMessage.end();
output(responseMessage.getBuffer(), 0, responseMessage.getLen(), true);
socketWrapper.write(true, responseMessage.getBuffer(), 0, responseMessage.getLen());
}


Expand All @@ -1473,7 +1455,7 @@ protected void flush(boolean explicit) throws IOException {
// TODO Validate the assertion above
if (explicit && !finished) {
// Send the flush message
output(flushMessageArray, 0, flushMessageArray.length, true);
socketWrapper.write(true, flushMessageArray, 0, flushMessageArray.length);
}
}

Expand Down Expand Up @@ -1505,22 +1487,13 @@ protected void finish() throws IOException {

// Add the end message
if (getErrorState().isError()) {
output(endAndCloseMessageArray, 0, endAndCloseMessageArray.length, true);
socketWrapper.write(true, endAndCloseMessageArray, 0, endAndCloseMessageArray.length);
} else {
output(endMessageArray, 0, endMessageArray.length, true);
socketWrapper.write(true, endMessageArray, 0, endMessageArray.length);
}
}


private int output(byte[] src, int offset, int length,
boolean block) throws IOException {
if (socketWrapper == null || socketWrapper.getSocket() == null)
return -1;

return socketWrapper.write(block, src, offset, length);
}


private boolean available() {
if (endOfStream) {
return false;
Expand Down Expand Up @@ -1569,15 +1542,12 @@ private void writeData(ByteChunk chunk) throws IOException {
socketWrapper.access();

boolean blocking = (response.getWriteListener() == null);
if (!blocking) {
flushBufferedData();
}

int len = chunk.getLength();
int off = 0;

// Write this chunk
while (responseMsgPos == -1 && len > 0) {
while (len > 0) {
int thisTime = len;
if (thisTime > outputMaxChunkSize) {
thisTime = outputMaxChunkSize;
Expand All @@ -1586,96 +1556,18 @@ private void writeData(ByteChunk chunk) throws IOException {
responseMessage.appendByte(Constants.JK_AJP13_SEND_BODY_CHUNK);
responseMessage.appendBytes(chunk.getBytes(), chunk.getOffset() + off, thisTime);
responseMessage.end();
writeResponseMessage(blocking);
socketWrapper.write(blocking, responseMessage.getBuffer(), 0, responseMessage.getLen());

len -= thisTime;
off += thisTime;
}

bytesWritten += off;

if (len > 0) {
// Add this chunk to the buffer
addToBuffers(chunk.getBuffer(), off, len);
}
}


private void addToBuffers(byte[] buf, int offset, int length) {
ByteBufferHolder holder = bufferedWrites.peekLast();
if (holder == null || holder.isFlipped() || holder.getBuf().remaining() < length) {
ByteBuffer buffer = ByteBuffer.allocate(Math.max(bufferedWriteSize,length));
holder = new ByteBufferHolder(buffer, false);
bufferedWrites.add(holder);
}
holder.getBuf().put(buf, offset, length);
}


private boolean hasDataToWrite() {
return responseMsgPos != -1 || bufferedWrites.size() > 0;
}


private void flushBufferedData() throws IOException {

if (responseMsgPos > -1) {
// Must be using non-blocking IO
// Partially written response message. Try and complete it.
writeResponseMessage(false);
}

while (responseMsgPos == -1 && bufferedWrites.size() > 0) {
// Try and write any remaining buffer data
Iterator<ByteBufferHolder> holders = bufferedWrites.iterator();
ByteBufferHolder holder = holders.next();
holder.flip();
ByteBuffer buffer = holder.getBuf();
int initialBufferSize = buffer.remaining();
while (responseMsgPos == -1 && buffer.remaining() > 0) {
transferToResponseMsg(buffer);
writeResponseMessage(false);
}
bytesWritten += (initialBufferSize - buffer.remaining());
if (buffer.remaining() == 0) {
holders.remove();
}
}
}


private void transferToResponseMsg(ByteBuffer buffer) {

int thisTime = buffer.remaining();
if (thisTime > outputMaxChunkSize) {
thisTime = outputMaxChunkSize;
}

responseMessage.reset();
responseMessage.appendByte(Constants.JK_AJP13_SEND_BODY_CHUNK);
buffer.get(responseMessage.getBuffer(), responseMessage.pos, thisTime);
responseMessage.end();
}


private void writeResponseMessage(boolean block) throws IOException {
int len = responseMessage.getLen();
int written = 1;
if (responseMsgPos == -1) {
// New message. Advance the write position to the beginning
responseMsgPos = 0;
}

while (written > 0 && responseMsgPos < len) {
written = output(
responseMessage.getBuffer(), responseMsgPos, len - responseMsgPos, block);
responseMsgPos += written;
}

// Message fully written, reset the position for a new message.
if (responseMsgPos == len) {
responseMsgPos = -1;
}
return responseMsgPos != -1 || socketWrapper.hasDataToWrite();
}


Expand Down
15 changes: 1 addition & 14 deletions java/org/apache/coyote/http11/AbstractOutputBuffer.java
Expand Up @@ -20,7 +20,6 @@
import java.nio.ByteBuffer;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Iterator;
import java.util.concurrent.LinkedBlockingDeque;

import org.apache.coyote.ActionCode;
Expand Down Expand Up @@ -653,19 +652,7 @@ protected final boolean isReady() throws IOException {


public boolean hasDataToWrite() {
return hasMoreDataToFlush() || hasBufferedData();
}


protected boolean hasBufferedData() {
boolean result = false;
if (bufferedWrites!=null) {
Iterator<ByteBufferHolder> iter = bufferedWrites.iterator();
while (!result && iter.hasNext()) {
result = iter.next().hasData();
}
}
return result;
return socketWrapper.hasDataToWrite();
}


Expand Down
Expand Up @@ -417,7 +417,6 @@ public boolean hasDataToWrite() {
}
}

@Override
protected boolean hasBufferedData() {
return bufferedWrites.size() > 0;
}
Expand Down

0 comments on commit 5dfa1df

Please sign in to comment.