Skip to content

Commit

Permalink
Refactor to use a common socketWriteBuffer reference
Browse files Browse the repository at this point in the history
git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@1648896 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
markt-asf committed Jan 1, 2015
1 parent 9e5df9e commit eb2bd57
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 63 deletions.
3 changes: 3 additions & 0 deletions java/org/apache/coyote/http11/AbstractOutputBuffer.java
Expand Up @@ -17,6 +17,7 @@
package org.apache.coyote.http11; package org.apache.coyote.http11;


import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.AccessController; import java.security.AccessController;
import java.security.PrivilegedAction; import java.security.PrivilegedAction;
import java.util.Iterator; import java.util.Iterator;
Expand Down Expand Up @@ -99,6 +100,8 @@ public abstract class AbstractOutputBuffer<S> implements OutputBuffer {
*/ */
protected long byteCount = 0; protected long byteCount = 0;


protected ByteBuffer socketWriteBuffer;

/** /**
* For "non-blocking" writes use an external set of buffers. Although the * For "non-blocking" writes use an external set of buffers. Although the
* API only allows one non-blocking write at a time, due to buffering and * API only allows one non-blocking write at a time, due to buffering and
Expand Down
47 changes: 19 additions & 28 deletions java/org/apache/coyote/http11/InternalAprOutputBuffer.java
Expand Up @@ -50,9 +50,9 @@ public InternalAprOutputBuffer(Response response, int headerBufferSize) {
super(response, headerBufferSize); super(response, headerBufferSize);


if (headerBufferSize < (8 * 1024)) { if (headerBufferSize < (8 * 1024)) {
bbuf = ByteBuffer.allocateDirect(6 * 1500); socketWriteBuffer = ByteBuffer.allocateDirect(6 * 1500);
} else { } else {
bbuf = ByteBuffer.allocateDirect((headerBufferSize / 1500 + 1) * 1500); socketWriteBuffer = ByteBuffer.allocateDirect((headerBufferSize / 1500 + 1) * 1500);
} }


outputStreamOutputBuffer = new SocketOutputBuffer(); outputStreamOutputBuffer = new SocketOutputBuffer();
Expand All @@ -72,13 +72,7 @@ public InternalAprOutputBuffer(Response response, int headerBufferSize) {




/** /**
* Direct byte buffer used for writing. * <code>false</code> if socketWriteBuffer is ready to be written to and
*/
private final ByteBuffer bbuf;


/**
* <code>false</code> if bbuf is ready to be written to and
* <code>true</code> is ready to be read from. * <code>true</code> is ready to be read from.
*/ */
private volatile boolean flipped = false; private volatile boolean flipped = false;
Expand All @@ -97,7 +91,7 @@ public void init(SocketWrapperBase<Long> socketWrapper,
socket = socketWrapper.getSocket().longValue(); socket = socketWrapper.getSocket().longValue();
this.endpoint = endpoint; this.endpoint = endpoint;


Socket.setsbb(this.socket, bbuf); Socket.setsbb(this.socket, socketWriteBuffer);
} }




Expand All @@ -107,12 +101,9 @@ public void init(SocketWrapperBase<Long> socketWrapper,
*/ */
@Override @Override
public void recycle() { public void recycle() {

super.recycle(); super.recycle();

socketWriteBuffer.clear();
bbuf.clear();
flipped = false; flipped = false;

socket = 0; socket = 0;
wrapper = null; wrapper = null;
} }
Expand Down Expand Up @@ -149,7 +140,7 @@ protected void commit() throws IOException {


if (pos > 0) { if (pos > 0) {
// Sending the response header buffer // Sending the response header buffer
bbuf.put(headerBuffer, 0, pos); socketWriteBuffer.put(headerBuffer, 0, pos);
} }


} }
Expand All @@ -171,15 +162,15 @@ private synchronized void addToBB(byte[] buf, int offset, int length)
// leaves data in the buffer // leaves data in the buffer
while (length > 0) { while (length > 0) {
int thisTime = length; int thisTime = length;
if (bbuf.position() == bbuf.capacity()) { if (socketWriteBuffer.position() == socketWriteBuffer.capacity()) {
if (flushBuffer(isBlocking())) { if (flushBuffer(isBlocking())) {
break; break;
} }
} }
if (thisTime > bbuf.capacity() - bbuf.position()) { if (thisTime > socketWriteBuffer.capacity() - socketWriteBuffer.position()) {
thisTime = bbuf.capacity() - bbuf.position(); thisTime = socketWriteBuffer.capacity() - socketWriteBuffer.position();
} }
bbuf.put(buf, offset, thisTime); socketWriteBuffer.put(buf, offset, thisTime);
length = length - thisTime; length = length - thisTime;
offset = offset + thisTime; offset = offset + thisTime;
} }
Expand Down Expand Up @@ -216,7 +207,7 @@ protected synchronized boolean flushBuffer(boolean block)
ByteBufferHolder buffer = bufIter.next(); ByteBufferHolder buffer = bufIter.next();
buffer.flip(); buffer.flip();
while (!hasMoreDataToFlush() && buffer.getBuf().remaining()>0) { while (!hasMoreDataToFlush() && buffer.getBuf().remaining()>0) {
transfer(buffer.getBuf(), bbuf); transfer(buffer.getBuf(), socketWriteBuffer);
if (buffer.getBuf().remaining() == 0) { if (buffer.getBuf().remaining() == 0) {
bufIter.remove(); bufIter.remove();
} }
Expand Down Expand Up @@ -275,23 +266,23 @@ private synchronized void writeToSocket(boolean block) throws IOException {
private synchronized void writeToSocket() throws IOException { private synchronized void writeToSocket() throws IOException {
if (!flipped) { if (!flipped) {
flipped = true; flipped = true;
bbuf.flip(); socketWriteBuffer.flip();
} }


int written; int written;


do { do {
written = Socket.sendbb(socket, bbuf.position(), bbuf.remaining()); written = Socket.sendbb(socket, socketWriteBuffer.position(), socketWriteBuffer.remaining());
if (Status.APR_STATUS_IS_EAGAIN(-written)) { if (Status.APR_STATUS_IS_EAGAIN(-written)) {
written = 0; written = 0;
} else if (written < 0) { } else if (written < 0) {
throw new IOException("APR error: " + written); throw new IOException("APR error: " + written);
} }
bbuf.position(bbuf.position() + written); socketWriteBuffer.position(socketWriteBuffer.position() + written);
} while (written > 0 && bbuf.hasRemaining()); } while (written > 0 && socketWriteBuffer.hasRemaining());


if (bbuf.remaining() == 0) { if (socketWriteBuffer.remaining() == 0) {
bbuf.clear(); socketWriteBuffer.clear();
flipped = false; flipped = false;
} }
// If there is data left in the buffer the socket will be registered for // If there is data left in the buffer the socket will be registered for
Expand All @@ -314,8 +305,8 @@ private void transfer(ByteBuffer from, ByteBuffer to) {


@Override @Override
protected synchronized boolean hasMoreDataToFlush() { protected synchronized boolean hasMoreDataToFlush() {
return (flipped && bbuf.remaining() > 0) || return (flipped && socketWriteBuffer.remaining() > 0) ||
(!flipped && bbuf.position() > 0); (!flipped && socketWriteBuffer.position() > 0);
} }




Expand Down
38 changes: 18 additions & 20 deletions java/org/apache/coyote/http11/InternalNio2OutputBuffer.java
Expand Up @@ -108,6 +108,7 @@ public void init(SocketWrapperBase<Nio2Channel> socketWrapper,
AbstractEndpoint<Nio2Channel> associatedEndpoint) throws IOException { AbstractEndpoint<Nio2Channel> associatedEndpoint) throws IOException {
this.socket = socketWrapper; this.socket = socketWrapper;
this.endpoint = associatedEndpoint; this.endpoint = associatedEndpoint;
this.socketWriteBuffer = socket.getSocket().getBufHandler().getWriteBuffer();


this.completionHandler = new CompletionHandler<Integer, ByteBuffer>() { this.completionHandler = new CompletionHandler<Integer, ByteBuffer>() {
@Override @Override
Expand Down Expand Up @@ -292,14 +293,12 @@ private void addToBB(byte[] buf, int offset, int length)
if (socket == null || socket.getSocket() == null) if (socket == null || socket.getSocket() == null)
return; return;


ByteBuffer writeByteBuffer = socket.getSocket().getBufHandler().getWriteBuffer();

if (isBlocking()) { if (isBlocking()) {
while (length > 0) { while (length > 0) {
int thisTime = transfer(buf, offset, length, writeByteBuffer); int thisTime = transfer(buf, offset, length, socketWriteBuffer);
length = length - thisTime; length = length - thisTime;
offset = offset + thisTime; offset = offset + thisTime;
if (writeByteBuffer.remaining() == 0) { if (socketWriteBuffer.remaining() == 0) {
flushBuffer(true); flushBuffer(true);
} }
} }
Expand All @@ -316,7 +315,7 @@ private void addToBB(byte[] buf, int offset, int length)
synchronized (completionHandler) { synchronized (completionHandler) {
// No pending completion handler, so writing to the main buffer // No pending completion handler, so writing to the main buffer
// is possible // is possible
int thisTime = transfer(buf, offset, length, writeByteBuffer); int thisTime = transfer(buf, offset, length, socketWriteBuffer);
length = length - thisTime; length = length - thisTime;
offset = offset + thisTime; offset = offset + thisTime;
if (length > 0) { if (length > 0) {
Expand Down Expand Up @@ -363,7 +362,6 @@ private boolean flushBufferInternal(boolean block, boolean hasPermit) throws IOE
if (socket == null || socket.getSocket() == null) if (socket == null || socket.getSocket() == null)
return false; return false;


ByteBuffer byteBuffer = socket.getSocket().getBufHandler().getWriteBuffer();
if (block) { if (block) {
if (!isBlocking()) { if (!isBlocking()) {
// The final flush is blocking, but the processing was using // The final flush is blocking, but the processing was using
Expand All @@ -389,11 +387,11 @@ private boolean flushBufferInternal(boolean block, boolean hasPermit) throws IOE
bufferedWrites.clear(); bufferedWrites.clear();
} }
if (!flipped) { if (!flipped) {
byteBuffer.flip(); socketWriteBuffer.flip();
flipped = true; flipped = true;
} }
while (byteBuffer.hasRemaining()) { while (socketWriteBuffer.hasRemaining()) {
if (socket.getSocket().write(byteBuffer).get(socket.getTimeout(), TimeUnit.MILLISECONDS).intValue() < 0) { if (socket.getSocket().write(socketWriteBuffer).get(socket.getTimeout(), TimeUnit.MILLISECONDS).intValue() < 0) {
throw new EOFException(sm.getString("iob.failedwrite")); throw new EOFException(sm.getString("iob.failedwrite"));
} }
} }
Expand All @@ -408,22 +406,22 @@ private boolean flushBufferInternal(boolean block, boolean hasPermit) throws IOE
} catch (TimeoutException e) { } catch (TimeoutException e) {
throw new SocketTimeoutException(); throw new SocketTimeoutException();
} }
byteBuffer.clear(); socketWriteBuffer.clear();
flipped = false; flipped = false;
return false; return false;
} else { } else {
synchronized (completionHandler) { synchronized (completionHandler) {
if (hasPermit || writePending.tryAcquire()) { if (hasPermit || writePending.tryAcquire()) {
if (!flipped) { if (!flipped) {
byteBuffer.flip(); socketWriteBuffer.flip();
flipped = true; flipped = true;
} }
Nio2Endpoint.startInline(); Nio2Endpoint.startInline();
if (bufferedWrites.size() > 0) { if (bufferedWrites.size() > 0) {
// Gathering write of the main buffer plus all leftovers // Gathering write of the main buffer plus all leftovers
ArrayList<ByteBuffer> arrayList = new ArrayList<>(); ArrayList<ByteBuffer> arrayList = new ArrayList<>();
if (byteBuffer.hasRemaining()) { if (socketWriteBuffer.hasRemaining()) {
arrayList.add(byteBuffer); arrayList.add(socketWriteBuffer);
} }
for (ByteBuffer buffer : bufferedWrites) { for (ByteBuffer buffer : bufferedWrites) {
buffer.flip(); buffer.flip();
Expand All @@ -433,18 +431,18 @@ private boolean flushBufferInternal(boolean block, boolean hasPermit) throws IOE
ByteBuffer[] array = arrayList.toArray(EMPTY_BUF_ARRAY); ByteBuffer[] array = arrayList.toArray(EMPTY_BUF_ARRAY);
socket.getSocket().write(array, 0, array.length, socket.getTimeout(), socket.getSocket().write(array, 0, array.length, socket.getTimeout(),
TimeUnit.MILLISECONDS, array, gatherCompletionHandler); TimeUnit.MILLISECONDS, array, gatherCompletionHandler);
} else if (byteBuffer.hasRemaining()) { } else if (socketWriteBuffer.hasRemaining()) {
// Regular write // Regular write
socket.getSocket().write(byteBuffer, socket.getTimeout(), socket.getSocket().write(socketWriteBuffer, socket.getTimeout(),
TimeUnit.MILLISECONDS, byteBuffer, completionHandler); TimeUnit.MILLISECONDS, socketWriteBuffer, completionHandler);
} else { } else {
// Nothing was written // Nothing was written
writePending.release(); writePending.release();
} }
Nio2Endpoint.endInline(); Nio2Endpoint.endInline();
if (writePending.availablePermits() > 0) { if (writePending.availablePermits() > 0) {
if (byteBuffer.remaining() == 0) { if (socketWriteBuffer.remaining() == 0) {
byteBuffer.clear(); socketWriteBuffer.clear();
flipped = false; flipped = false;
} }
} }
Expand All @@ -464,8 +462,8 @@ public boolean hasDataToWrite() {


@Override @Override
protected boolean hasMoreDataToFlush() { protected boolean hasMoreDataToFlush() {
return (flipped && socket.getSocket().getBufHandler().getWriteBuffer().remaining() > 0) || return (flipped && socketWriteBuffer.remaining() > 0) ||
(!flipped && socket.getSocket().getBufHandler().getWriteBuffer().position() > 0); (!flipped && socketWriteBuffer.position() > 0);
} }


@Override @Override
Expand Down
27 changes: 12 additions & 15 deletions java/org/apache/coyote/http11/InternalNioOutputBuffer.java
Expand Up @@ -77,6 +77,7 @@ public void init(SocketWrapperBase<NioChannel> socketWrapper,


socket = socketWrapper.getSocket(); socket = socketWrapper.getSocket();
pool = ((NioEndpoint)endpoint).getSelectorPool(); pool = ((NioEndpoint)endpoint).getSelectorPool();
socketWriteBuffer = socket.getBufHandler().getWriteBuffer();
} }




Expand All @@ -87,11 +88,9 @@ public void init(SocketWrapperBase<NioChannel> socketWrapper,
@Override @Override
public void recycle() { public void recycle() {
super.recycle(); super.recycle();
if (socket != null) { socketWriteBuffer.clear();
socket.getBufHandler().getWriteBuffer().clear();
socket = null;
}
flipped = false; flipped = false;
socket = null;
} }




Expand All @@ -103,9 +102,8 @@ public void recycle() {
@Override @Override
public void sendAck() throws IOException { public void sendAck() throws IOException {
if (!committed) { if (!committed) {
socket.getBufHandler().getWriteBuffer().put( socketWriteBuffer.put(Constants.ACK_BYTES, 0, Constants.ACK_BYTES.length);
Constants.ACK_BYTES, 0, Constants.ACK_BYTES.length); int result = writeToSocket(socketWriteBuffer, true, true);
int result = writeToSocket(socket.getBufHandler().getWriteBuffer(), true, true);
if (result < 0) { if (result < 0) {
throw new IOException(sm.getString("iob.failedwrite.ack")); throw new IOException(sm.getString("iob.failedwrite.ack"));
} }
Expand Down Expand Up @@ -191,11 +189,10 @@ private synchronized void addToBB(byte[] buf, int offset, int length)
// Keep writing until all the data is written or a non-blocking write // Keep writing until all the data is written or a non-blocking write
// leaves data in the buffer // leaves data in the buffer
while (!dataLeft && length > 0) { while (!dataLeft && length > 0) {
int thisTime = transfer(buf,offset,length,socket.getBufHandler().getWriteBuffer()); int thisTime = transfer(buf,offset,length,socketWriteBuffer);
length = length - thisTime; length = length - thisTime;
offset = offset + thisTime; offset = offset + thisTime;
int written = writeToSocket(socket.getBufHandler().getWriteBuffer(), int written = writeToSocket(socketWriteBuffer, isBlocking(), true);
isBlocking(), true);
if (written == 0) { if (written == 0) {
dataLeft = true; dataLeft = true;
} else { } else {
Expand Down Expand Up @@ -241,7 +238,7 @@ protected boolean flushBuffer(boolean block) throws IOException {


//write to the socket, if there is anything to write //write to the socket, if there is anything to write
if (dataLeft) { if (dataLeft) {
writeToSocket(socket.getBufHandler().getWriteBuffer(),block, !flipped); writeToSocket(socketWriteBuffer, block, !flipped);
} }


dataLeft = hasMoreDataToFlush(); dataLeft = hasMoreDataToFlush();
Expand All @@ -252,11 +249,11 @@ protected boolean flushBuffer(boolean block) throws IOException {
ByteBufferHolder buffer = bufIter.next(); ByteBufferHolder buffer = bufIter.next();
buffer.flip(); buffer.flip();
while (!hasMoreDataToFlush() && buffer.getBuf().remaining()>0) { while (!hasMoreDataToFlush() && buffer.getBuf().remaining()>0) {
transfer(buffer.getBuf(), socket.getBufHandler().getWriteBuffer()); transfer(buffer.getBuf(), socketWriteBuffer);
if (buffer.getBuf().remaining() == 0) { if (buffer.getBuf().remaining() == 0) {
bufIter.remove(); bufIter.remove();
} }
writeToSocket(socket.getBufHandler().getWriteBuffer(),block, true); writeToSocket(socketWriteBuffer, block, true);
//here we must break if we didn't finish the write //here we must break if we didn't finish the write
} }
} }
Expand All @@ -268,8 +265,8 @@ protected boolean flushBuffer(boolean block) throws IOException {


@Override @Override
protected boolean hasMoreDataToFlush() { protected boolean hasMoreDataToFlush() {
return (flipped && socket.getBufHandler().getWriteBuffer().remaining()>0) || return (flipped && socketWriteBuffer.remaining() > 0) ||
(!flipped && socket.getBufHandler().getWriteBuffer().position() > 0); (!flipped && socketWriteBuffer.position() > 0);
} }




Expand Down

0 comments on commit eb2bd57

Please sign in to comment.