Skip to content

Commit

Permalink
Add SocketWrapper to AbstractOutputBuffer.
Browse files Browse the repository at this point in the history
While this allows a little code reduction now, the primary reason for
this is a step towards the goal of having a single OutputBuffer
implementation with the APR/NIO/NIO2 code moving to the SocketWrapper or
Endpoint as appropriate.

git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@1650265 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
markt-asf committed Jan 8, 2015
1 parent d669455 commit 14bac67
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 58 deletions.
2 changes: 1 addition & 1 deletion java/org/apache/coyote/http11/AbstractHttp11Processor.java
Expand Up @@ -939,7 +939,7 @@ public SocketState process(SocketWrapperBase<S> socketWrapper)
// Setting up the I/O // Setting up the I/O
setSocketWrapper(socketWrapper); setSocketWrapper(socketWrapper);
getInputBuffer().init(socketWrapper, endpoint); getInputBuffer().init(socketWrapper, endpoint);
getOutputBuffer().init(socketWrapper, endpoint); getOutputBuffer().init(socketWrapper);


// Flags // Flags
keepAlive = true; keepAlive = true;
Expand Down
14 changes: 11 additions & 3 deletions java/org/apache/coyote/http11/AbstractOutputBuffer.java
Expand Up @@ -33,7 +33,6 @@
import org.apache.tomcat.util.buf.ByteChunk; import org.apache.tomcat.util.buf.ByteChunk;
import org.apache.tomcat.util.buf.MessageBytes; import org.apache.tomcat.util.buf.MessageBytes;
import org.apache.tomcat.util.http.HttpMessages; import org.apache.tomcat.util.http.HttpMessages;
import org.apache.tomcat.util.net.AbstractEndpoint;
import org.apache.tomcat.util.net.SocketWrapperBase; import org.apache.tomcat.util.net.SocketWrapperBase;
import org.apache.tomcat.util.res.StringManager; import org.apache.tomcat.util.res.StringManager;


Expand Down Expand Up @@ -95,6 +94,12 @@ public abstract class AbstractOutputBuffer<S> implements OutputBuffer {
*/ */
protected OutputBuffer outputStreamOutputBuffer; protected OutputBuffer outputStreamOutputBuffer;


/**
* Wrapper for socket where data will be written to.
*/
protected SocketWrapperBase<S> socketWrapper;


/** /**
* Bytes written to client for the current request * Bytes written to client for the current request
*/ */
Expand Down Expand Up @@ -316,6 +321,7 @@ public void reset() {
public void recycle() { public void recycle() {
// Sub-classes may wish to do more than this. // Sub-classes may wish to do more than this.
nextRequest(); nextRequest();
socketWrapper = null;
bufferedWrites.clear(); bufferedWrites.clear();
writeBufferFlipped = false; writeBufferFlipped = false;
} }
Expand Down Expand Up @@ -368,8 +374,10 @@ public void endRequest() throws IOException {
} }




public abstract void init(SocketWrapperBase<S> socketWrapper, public void init(SocketWrapperBase<S> socketWrapper) {
AbstractEndpoint<S> endpoint) throws IOException; this.socketWrapper = socketWrapper;
}



public abstract void sendAck() throws IOException; public abstract void sendAck() throws IOException;


Expand Down
20 changes: 7 additions & 13 deletions java/org/apache/coyote/http11/InternalAprOutputBuffer.java
Expand Up @@ -64,21 +64,16 @@ public InternalAprOutputBuffer(Response response, int headerBufferSize) {
private long socket; private long socket;




private SocketWrapperBase<Long> wrapper;


private AbstractEndpoint<Long> endpoint; private AbstractEndpoint<Long> endpoint;




// --------------------------------------------------------- Public Methods // --------------------------------------------------------- Public Methods


@Override @Override
public void init(SocketWrapperBase<Long> socketWrapper, public void init(SocketWrapperBase<Long> socketWrapper) {
AbstractEndpoint<Long> endpoint) throws IOException { super.init(socketWrapper);

wrapper = socketWrapper;
socket = socketWrapper.getSocket().longValue(); socket = socketWrapper.getSocket().longValue();
this.endpoint = endpoint; this.endpoint = socketWrapper.getEndpoint();


Socket.setsbb(this.socket, socketWriteBuffer); Socket.setsbb(this.socket, socketWriteBuffer);
} }
Expand All @@ -93,7 +88,6 @@ public void recycle() {
super.recycle(); super.recycle();
socketWriteBuffer.clear(); socketWriteBuffer.clear();
socket = 0; socket = 0;
wrapper = null;
} }




Expand Down Expand Up @@ -191,12 +185,12 @@ protected synchronized boolean flushBuffer(boolean block)


private synchronized void writeToSocket(boolean block) throws IOException { private synchronized void writeToSocket(boolean block) throws IOException {


Lock readLock = wrapper.getBlockingStatusReadLock(); Lock readLock = socketWrapper.getBlockingStatusReadLock();
WriteLock writeLock = wrapper.getBlockingStatusWriteLock(); WriteLock writeLock = socketWrapper.getBlockingStatusWriteLock();


readLock.lock(); readLock.lock();
try { try {
if (wrapper.getBlockingStatus() == block) { if (socketWrapper.getBlockingStatus() == block) {
writeToSocket(); writeToSocket();
return; return;
} }
Expand All @@ -207,7 +201,7 @@ private synchronized void writeToSocket(boolean block) throws IOException {
writeLock.lock(); writeLock.lock();
try { try {
// Set the current settings for this socket // Set the current settings for this socket
wrapper.setBlockingStatus(block); socketWrapper.setBlockingStatus(block);
if (block) { if (block) {
Socket.timeoutSet(socket, endpoint.getSoTimeout() * 1000); Socket.timeoutSet(socket, endpoint.getSoTimeout() * 1000);
} else { } else {
Expand Down
55 changes: 24 additions & 31 deletions java/org/apache/coyote/http11/InternalNio2OutputBuffer.java
Expand Up @@ -53,11 +53,6 @@ public InternalNio2OutputBuffer(Response response, int headerBufferSize) {


private static final ByteBuffer[] EMPTY_BUF_ARRAY = new ByteBuffer[0]; private static final ByteBuffer[] EMPTY_BUF_ARRAY = new ByteBuffer[0];


/**
* Underlying socket.
*/
private SocketWrapperBase<Nio2Channel> socket;

/** /**
* Track write interest * Track write interest
*/ */
Expand Down Expand Up @@ -96,11 +91,10 @@ public InternalNio2OutputBuffer(Response response, int headerBufferSize) {
// --------------------------------------------------------- Public Methods // --------------------------------------------------------- Public Methods


@Override @Override
public void init(SocketWrapperBase<Nio2Channel> socketWrapper, public void init(SocketWrapperBase<Nio2Channel> socketWrapper) {
AbstractEndpoint<Nio2Channel> associatedEndpoint) throws IOException { super.init(socketWrapper);
this.socket = socketWrapper; this.endpoint = socketWrapper.getEndpoint();
this.endpoint = associatedEndpoint; this.socketWriteBuffer = socketWrapper.getSocket().getBufHandler().getWriteBuffer();
this.socketWriteBuffer = socket.getSocket().getBufHandler().getWriteBuffer();


this.completionHandler = new CompletionHandler<Integer, ByteBuffer>() { this.completionHandler = new CompletionHandler<Integer, ByteBuffer>() {
@Override @Override
Expand All @@ -121,12 +115,12 @@ public void completed(Integer nBytes, ByteBuffer attachment) {
} }
bufferedWrites.clear(); bufferedWrites.clear();
ByteBuffer[] array = arrayList.toArray(EMPTY_BUF_ARRAY); ByteBuffer[] array = arrayList.toArray(EMPTY_BUF_ARRAY);
socket.getSocket().write(array, 0, array.length, socketWrapper.getSocket().write(array, 0, array.length,
socket.getTimeout(), TimeUnit.MILLISECONDS, socketWrapper.getTimeout(), TimeUnit.MILLISECONDS,
array, gatherCompletionHandler); array, gatherCompletionHandler);
} else if (attachment.hasRemaining()) { } else if (attachment.hasRemaining()) {
// Regular write // Regular write
socket.getSocket().write(attachment, socket.getTimeout(), socketWrapper.getSocket().write(attachment, socketWrapper.getTimeout(),
TimeUnit.MILLISECONDS, attachment, completionHandler); TimeUnit.MILLISECONDS, attachment, completionHandler);
} else { } else {
// All data has been written // All data has been written
Expand All @@ -138,21 +132,21 @@ public void completed(Integer nBytes, ByteBuffer attachment) {
} }
} }
if (notify) { if (notify) {
endpoint.processSocket(socket, SocketStatus.OPEN_WRITE, false); endpoint.processSocket(socketWrapper, SocketStatus.OPEN_WRITE, false);
} }
} }


@Override @Override
public void failed(Throwable exc, ByteBuffer attachment) { public void failed(Throwable exc, ByteBuffer attachment) {
socket.setError(true); socketWrapper.setError(true);
if (exc instanceof IOException) { if (exc instanceof IOException) {
e = (IOException) exc; e = (IOException) exc;
} else { } else {
e = new IOException(exc); e = new IOException(exc);
} }
response.getRequest().setAttribute(RequestDispatcher.ERROR_EXCEPTION, e); response.getRequest().setAttribute(RequestDispatcher.ERROR_EXCEPTION, e);
writePending.release(); writePending.release();
endpoint.processSocket(socket, SocketStatus.OPEN_WRITE, true); endpoint.processSocket(socketWrapper, SocketStatus.OPEN_WRITE, true);
} }
}; };
this.gatherCompletionHandler = new CompletionHandler<Long, ByteBuffer[]>() { this.gatherCompletionHandler = new CompletionHandler<Long, ByteBuffer[]>() {
Expand All @@ -176,8 +170,8 @@ public void completed(Long nBytes, ByteBuffer[] attachment) {
} }
bufferedWrites.clear(); bufferedWrites.clear();
ByteBuffer[] array = arrayList.toArray(EMPTY_BUF_ARRAY); ByteBuffer[] array = arrayList.toArray(EMPTY_BUF_ARRAY);
socket.getSocket().write(array, 0, array.length, socketWrapper.getSocket().write(array, 0, array.length,
socket.getTimeout(), TimeUnit.MILLISECONDS, socketWrapper.getTimeout(), TimeUnit.MILLISECONDS,
array, gatherCompletionHandler); array, gatherCompletionHandler);
} else { } else {
// All data has been written // All data has been written
Expand All @@ -189,21 +183,21 @@ public void completed(Long nBytes, ByteBuffer[] attachment) {
} }
} }
if (notify) { if (notify) {
endpoint.processSocket(socket, SocketStatus.OPEN_WRITE, false); endpoint.processSocket(socketWrapper, SocketStatus.OPEN_WRITE, false);
} }
} }


@Override @Override
public void failed(Throwable exc, ByteBuffer[] attachment) { public void failed(Throwable exc, ByteBuffer[] attachment) {
socket.setError(true); socketWrapper.setError(true);
if (exc instanceof IOException) { if (exc instanceof IOException) {
e = (IOException) exc; e = (IOException) exc;
} else { } else {
e = new IOException(exc); e = new IOException(exc);
} }
response.getRequest().setAttribute(RequestDispatcher.ERROR_EXCEPTION, e); response.getRequest().setAttribute(RequestDispatcher.ERROR_EXCEPTION, e);
writePending.release(); writePending.release();
endpoint.processSocket(socket, SocketStatus.OPEN_WRITE, true); endpoint.processSocket(socketWrapper, SocketStatus.OPEN_WRITE, true);
} }
}; };
} }
Expand All @@ -216,7 +210,6 @@ public void failed(Throwable exc, ByteBuffer[] attachment) {
@Override @Override
public void recycle() { public void recycle() {
super.recycle(); super.recycle();
socket = null;
e = null; e = null;
interest = false; interest = false;
if (writePending.availablePermits() != 1) { if (writePending.availablePermits() != 1) {
Expand Down Expand Up @@ -264,7 +257,7 @@ protected void addToBB(byte[] buf, int offset, int length)


if (length == 0) if (length == 0)
return; return;
if (socket == null || socket.getSocket() == null) if (socketWrapper == null || socketWrapper.getSocket() == null)
return; return;


if (isBlocking()) { if (isBlocking()) {
Expand All @@ -284,7 +277,7 @@ protected void addToBB(byte[] buf, int offset, int length)
// Also allows doing autoblocking // Also allows doing autoblocking
// Could be "smart" with coordination with the main CoyoteOutputStream to // Could be "smart" with coordination with the main CoyoteOutputStream to
// indicate the end of a write // indicate the end of a write
// Uses: if (writePending.tryAcquire(socket.getTimeout(), TimeUnit.MILLISECONDS)) // Uses: if (writePending.tryAcquire(socketWrapper.getTimeout(), TimeUnit.MILLISECONDS))
if (writePending.tryAcquire()) { if (writePending.tryAcquire()) {
synchronized (completionHandler) { synchronized (completionHandler) {
// No pending completion handler, so writing to the main buffer // No pending completion handler, so writing to the main buffer
Expand Down Expand Up @@ -326,15 +319,15 @@ protected boolean flushBuffer(boolean block) throws IOException {
} }


private boolean flushBufferInternal(boolean block, boolean hasPermit) throws IOException { private boolean flushBufferInternal(boolean block, boolean hasPermit) throws IOException {
if (socket == null || socket.getSocket() == null) if (socketWrapper == null || socketWrapper.getSocket() == null)
return false; return false;


if (block) { if (block) {
if (!isBlocking()) { if (!isBlocking()) {
// The final flush is blocking, but the processing was using // The final flush is blocking, but the processing was using
// non blocking so wait until an async write is done // non blocking so wait until an async write is done
try { try {
if (writePending.tryAcquire(socket.getTimeout(), TimeUnit.MILLISECONDS)) { if (writePending.tryAcquire(socketWrapper.getTimeout(), TimeUnit.MILLISECONDS)) {
writePending.release(); writePending.release();
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
Expand All @@ -346,7 +339,7 @@ private boolean flushBufferInternal(boolean block, boolean hasPermit) throws IOE
for (ByteBuffer buffer : bufferedWrites) { for (ByteBuffer buffer : bufferedWrites) {
buffer.flip(); buffer.flip();
while (buffer.hasRemaining()) { while (buffer.hasRemaining()) {
if (socket.getSocket().write(buffer).get(socket.getTimeout(), TimeUnit.MILLISECONDS).intValue() < 0) { if (socketWrapper.getSocket().write(buffer).get(socketWrapper.getTimeout(), TimeUnit.MILLISECONDS).intValue() < 0) {
throw new EOFException(sm.getString("iob.failedwrite")); throw new EOFException(sm.getString("iob.failedwrite"));
} }
} }
Expand All @@ -358,7 +351,7 @@ private boolean flushBufferInternal(boolean block, boolean hasPermit) throws IOE
writeBufferFlipped = true; writeBufferFlipped = true;
} }
while (socketWriteBuffer.hasRemaining()) { while (socketWriteBuffer.hasRemaining()) {
if (socket.getSocket().write(socketWriteBuffer).get(socket.getTimeout(), TimeUnit.MILLISECONDS).intValue() < 0) { if (socketWrapper.getSocket().write(socketWriteBuffer).get(socketWrapper.getTimeout(), TimeUnit.MILLISECONDS).intValue() < 0) {
throw new EOFException(sm.getString("iob.failedwrite")); throw new EOFException(sm.getString("iob.failedwrite"));
} }
} }
Expand Down Expand Up @@ -396,11 +389,11 @@ private boolean flushBufferInternal(boolean block, boolean hasPermit) throws IOE
} }
bufferedWrites.clear(); bufferedWrites.clear();
ByteBuffer[] array = arrayList.toArray(EMPTY_BUF_ARRAY); ByteBuffer[] array = arrayList.toArray(EMPTY_BUF_ARRAY);
socket.getSocket().write(array, 0, array.length, socket.getTimeout(), socketWrapper.getSocket().write(array, 0, array.length, socketWrapper.getTimeout(),
TimeUnit.MILLISECONDS, array, gatherCompletionHandler); TimeUnit.MILLISECONDS, array, gatherCompletionHandler);
} else if (socketWriteBuffer.hasRemaining()) { } else if (socketWriteBuffer.hasRemaining()) {
// Regular write // Regular write
socket.getSocket().write(socketWriteBuffer, socket.getTimeout(), socketWrapper.getSocket().write(socketWriteBuffer, socketWrapper.getTimeout(),
TimeUnit.MILLISECONDS, socketWriteBuffer, completionHandler); TimeUnit.MILLISECONDS, socketWriteBuffer, completionHandler);
} else { } else {
// Nothing was written // Nothing was written
Expand Down Expand Up @@ -439,7 +432,7 @@ protected void registerWriteInterest() {
interest = true; interest = true;
} else { } else {
// If no write is pending, notify // If no write is pending, notify
endpoint.processSocket(socket, SocketStatus.OPEN_WRITE, true); endpoint.processSocket(socketWrapper, SocketStatus.OPEN_WRITE, true);
} }
} }
} }
Expand Down
12 changes: 5 additions & 7 deletions java/org/apache/coyote/http11/InternalNioOutputBuffer.java
Expand Up @@ -25,7 +25,6 @@


import org.apache.coyote.ByteBufferHolder; import org.apache.coyote.ByteBufferHolder;
import org.apache.coyote.Response; import org.apache.coyote.Response;
import org.apache.tomcat.util.net.AbstractEndpoint;
import org.apache.tomcat.util.net.NioChannel; import org.apache.tomcat.util.net.NioChannel;
import org.apache.tomcat.util.net.NioEndpoint; import org.apache.tomcat.util.net.NioEndpoint;
import org.apache.tomcat.util.net.NioSelectorPool; import org.apache.tomcat.util.net.NioSelectorPool;
Expand Down Expand Up @@ -62,11 +61,10 @@ public InternalNioOutputBuffer(Response response, int headerBufferSize) {
// --------------------------------------------------------- Public Methods // --------------------------------------------------------- Public Methods


@Override @Override
public void init(SocketWrapperBase<NioChannel> socketWrapper, public void init(SocketWrapperBase<NioChannel> socketWrapper) {
AbstractEndpoint<NioChannel> endpoint) throws IOException { super.init(socketWrapper);

socket = socketWrapper.getSocket(); socket = socketWrapper.getSocket();
pool = ((NioEndpoint)endpoint).getSelectorPool(); pool = ((NioEndpoint)socketWrapper.getEndpoint()).getSelectorPool();
socketWriteBuffer = socket.getBufHandler().getWriteBuffer(); socketWriteBuffer = socket.getBufHandler().getWriteBuffer();
} }


Expand Down Expand Up @@ -170,8 +168,8 @@ protected synchronized void addToBB(byte[] buf, int offset, int length)
} }
} }


NioEndpoint.NioSocketWrapper ka = (NioEndpoint.NioSocketWrapper)socket.getAttachment(); // Prevent timeouts for just doing client writes
if (ka != null) ka.access();//prevent timeouts for just doing client writes socketWrapper.access();


if (!isBlocking() && length > 0) { if (!isBlocking() && length > 0) {
// Remaining data must be buffered // Remaining data must be buffered
Expand Down
Expand Up @@ -24,7 +24,6 @@
import org.apache.coyote.Response; import org.apache.coyote.Response;
import org.apache.coyote.http11.AbstractOutputBuffer; import org.apache.coyote.http11.AbstractOutputBuffer;
import org.apache.tomcat.util.buf.ByteChunk; import org.apache.tomcat.util.buf.ByteChunk;
import org.apache.tomcat.util.net.AbstractEndpoint;
import org.apache.tomcat.util.net.SocketWrapperBase; import org.apache.tomcat.util.net.SocketWrapperBase;


/** /**
Expand All @@ -47,8 +46,7 @@ public TesterOutputBuffer(Response response, int headerBufferSize) {
// --------------------------------------------------------- Public Methods // --------------------------------------------------------- Public Methods


@Override @Override
public void init(SocketWrapperBase<Socket> socketWrapper, public void init(SocketWrapperBase<Socket> socketWrapper) {
AbstractEndpoint<Socket> endpoint) throws IOException {
// NO-OP: Unused // NO-OP: Unused
} }


Expand Down

0 comments on commit 14bac67

Please sign in to comment.