Skip to content

Commit

Permalink
Move the additional dispatches required from the SocketWrapper to the
Browse files Browse the repository at this point in the history
Processor

git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@1709543 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
markt-asf committed Oct 20, 2015
1 parent 40bf6fc commit c648adc
Show file tree
Hide file tree
Showing 11 changed files with 75 additions and 86 deletions.
42 changes: 42 additions & 0 deletions java/org/apache/coyote/AbstractProcessorLight.java
Expand Up @@ -16,11 +16,53 @@
*/ */
package org.apache.coyote; package org.apache.coyote;


import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;

import org.apache.tomcat.util.net.DispatchType;

/** /**
* This is a light-weight abstract processor implementation that is intended as * This is a light-weight abstract processor implementation that is intended as
* a basis for all Processor implementations from the light-weight upgrade * a basis for all Processor implementations from the light-weight upgrade
* processors to the HTTP/AJP processors. * processors to the HTTP/AJP processors.
*/ */
public abstract class AbstractProcessorLight implements Processor { public abstract class AbstractProcessorLight implements Processor {


private Set<DispatchType> dispatches = new CopyOnWriteArraySet<>();


@Override
public void addDispatch(DispatchType dispatchType) {
synchronized (dispatches) {
dispatches.add(dispatchType);
}
}


@Override
public Iterator<DispatchType> getIteratorAndClearDispatches() {
// Note: Logic in AbstractProtocol depends on this method only returning
// a non-null value if the iterator is non-empty. i.e. it should never
// return an empty iterator.
Iterator<DispatchType> result;
synchronized (dispatches) {
// Synchronized as the generation of the iterator and the clearing
// of dispatches needs to be an atomic operation.
result = dispatches.iterator();
if (result.hasNext()) {
dispatches.clear();
} else {
result = null;
}
}
return result;
}


protected void clearDispatches() {
synchronized (dispatches) {
dispatches.clear();
}
}
} }
2 changes: 1 addition & 1 deletion java/org/apache/coyote/AbstractProtocol.java
Expand Up @@ -758,7 +758,7 @@ public SocketState process(SocketWrapperBase<S> wrapper,
if (dispatches == null || !dispatches.hasNext()) { if (dispatches == null || !dispatches.hasNext()) {
// Only returns non-null iterator if there are // Only returns non-null iterator if there are
// dispatches to process. // dispatches to process.
dispatches = wrapper.getIteratorAndClearDispatches(); dispatches = processor.getIteratorAndClearDispatches();
} }
} while (state == SocketState.ASYNC_END || } while (state == SocketState.ASYNC_END ||
state == SocketState.UPGRADING || state == SocketState.UPGRADING ||
Expand Down
6 changes: 6 additions & 0 deletions java/org/apache/coyote/Processor.java
Expand Up @@ -18,11 +18,13 @@


import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;


import javax.servlet.http.HttpUpgradeHandler; import javax.servlet.http.HttpUpgradeHandler;


import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState; import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
import org.apache.tomcat.util.net.DispatchType;
import org.apache.tomcat.util.net.SSLSupport; import org.apache.tomcat.util.net.SSLSupport;
import org.apache.tomcat.util.net.SocketStatus; import org.apache.tomcat.util.net.SocketStatus;
import org.apache.tomcat.util.net.SocketWrapperBase; import org.apache.tomcat.util.net.SocketWrapperBase;
Expand Down Expand Up @@ -96,4 +98,8 @@ public interface Processor {
* an existing multiplexed connection. * an existing multiplexed connection.
*/ */
void pause(); void pause();

void addDispatch(DispatchType dispatchType);

Iterator<DispatchType> getIteratorAndClearDispatches();
} }
8 changes: 4 additions & 4 deletions java/org/apache/coyote/ajp/AjpProcessor.java
Expand Up @@ -483,7 +483,7 @@ public final void action(ActionCode actionCode, Object param) {
break; break;
} }
case ASYNC_COMPLETE: { case ASYNC_COMPLETE: {
socketWrapper.clearDispatches(); clearDispatches();
if (asyncStateMachine.asyncComplete()) { if (asyncStateMachine.asyncComplete()) {
socketWrapper.processSocket(SocketStatus.OPEN_READ, true); socketWrapper.processSocket(SocketStatus.OPEN_READ, true);
} }
Expand Down Expand Up @@ -573,15 +573,15 @@ public final void action(ActionCode actionCode, Object param) {
break; break;
} }
case DISPATCH_READ: { case DISPATCH_READ: {
socketWrapper.addDispatch(DispatchType.NON_BLOCKING_READ); addDispatch(DispatchType.NON_BLOCKING_READ);
break; break;
} }
case DISPATCH_WRITE: { case DISPATCH_WRITE: {
socketWrapper.addDispatch(DispatchType.NON_BLOCKING_WRITE); addDispatch(DispatchType.NON_BLOCKING_WRITE);
break; break;
} }
case DISPATCH_EXECUTE: { case DISPATCH_EXECUTE: {
socketWrapper.executeNonBlockingDispatches(); socketWrapper.executeNonBlockingDispatches(getIteratorAndClearDispatches());
break; break;
} }
case CLOSE_NOW: { case CLOSE_NOW: {
Expand Down
8 changes: 4 additions & 4 deletions java/org/apache/coyote/http11/Http11Processor.java
Expand Up @@ -731,7 +731,7 @@ public final void action(ActionCode actionCode, Object param) {
break; break;
} }
case ASYNC_COMPLETE: { case ASYNC_COMPLETE: {
socketWrapper.clearDispatches(); clearDispatches();
if (asyncStateMachine.asyncComplete()) { if (asyncStateMachine.asyncComplete()) {
socketWrapper.processSocket(SocketStatus.OPEN_READ, true); socketWrapper.processSocket(SocketStatus.OPEN_READ, true);
} }
Expand Down Expand Up @@ -776,17 +776,17 @@ public final void action(ActionCode actionCode, Object param) {
break; break;
} }
case DISPATCH_READ: { case DISPATCH_READ: {
socketWrapper.addDispatch(DispatchType.NON_BLOCKING_READ); addDispatch(DispatchType.NON_BLOCKING_READ);
break; break;
} }
case DISPATCH_WRITE: { case DISPATCH_WRITE: {
socketWrapper.addDispatch(DispatchType.NON_BLOCKING_WRITE); addDispatch(DispatchType.NON_BLOCKING_WRITE);
break; break;
} }
case DISPATCH_EXECUTE: { case DISPATCH_EXECUTE: {
SocketWrapperBase<?> wrapper = socketWrapper; SocketWrapperBase<?> wrapper = socketWrapper;
if (wrapper != null) { if (wrapper != null) {
wrapper.executeNonBlockingDispatches(); wrapper.executeNonBlockingDispatches(getIteratorAndClearDispatches());
} }
break; break;
} }
Expand Down
Expand Up @@ -43,8 +43,8 @@ public class UpgradeProcessorExternal extends UpgradeProcessorBase {
public UpgradeProcessorExternal(SocketWrapperBase<?> wrapper, ByteBuffer leftOverInput, public UpgradeProcessorExternal(SocketWrapperBase<?> wrapper, ByteBuffer leftOverInput,
HttpUpgradeHandler httpUpgradeHandler) { HttpUpgradeHandler httpUpgradeHandler) {
super(wrapper, leftOverInput, httpUpgradeHandler); super(wrapper, leftOverInput, httpUpgradeHandler);
this.upgradeServletInputStream = new UpgradeServletInputStream(wrapper); this.upgradeServletInputStream = new UpgradeServletInputStream(this, wrapper);
this.upgradeServletOutputStream = new UpgradeServletOutputStream(wrapper); this.upgradeServletOutputStream = new UpgradeServletOutputStream(this, wrapper);


wrapper.unRead(leftOverInput); wrapper.unRead(leftOverInput);
/* /*
Expand Down
Expand Up @@ -22,6 +22,7 @@
import javax.servlet.ServletInputStream; import javax.servlet.ServletInputStream;


import org.apache.coyote.ContainerThreadMarker; import org.apache.coyote.ContainerThreadMarker;
import org.apache.coyote.Processor;
import org.apache.juli.logging.Log; import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory; import org.apache.juli.logging.LogFactory;
import org.apache.tomcat.util.ExceptionUtils; import org.apache.tomcat.util.ExceptionUtils;
Expand All @@ -35,6 +36,7 @@ public class UpgradeServletInputStream extends ServletInputStream {
private static final StringManager sm = private static final StringManager sm =
StringManager.getManager(UpgradeServletInputStream.class); StringManager.getManager(UpgradeServletInputStream.class);


private final Processor processor;
private final SocketWrapperBase<?> socketWrapper; private final SocketWrapperBase<?> socketWrapper;


private volatile boolean closed = false; private volatile boolean closed = false;
Expand All @@ -45,7 +47,8 @@ public class UpgradeServletInputStream extends ServletInputStream {
private volatile ClassLoader applicationLoader = null; private volatile ClassLoader applicationLoader = null;




public UpgradeServletInputStream(SocketWrapperBase<?> socketWrapper) { public UpgradeServletInputStream(Processor processor, SocketWrapperBase<?> socketWrapper) {
this.processor = processor;
this.socketWrapper = socketWrapper; this.socketWrapper = socketWrapper;
} }


Expand Down Expand Up @@ -101,7 +104,7 @@ public final void setReadListener(ReadListener listener) {


// Container is responsible for first call to onDataAvailable(). // Container is responsible for first call to onDataAvailable().
if (ContainerThreadMarker.isContainerThread()) { if (ContainerThreadMarker.isContainerThread()) {
socketWrapper.addDispatch(DispatchType.NON_BLOCKING_READ); processor.addDispatch(DispatchType.NON_BLOCKING_READ);
} else { } else {
socketWrapper.registerReadInterest(); socketWrapper.registerReadInterest();
} }
Expand Down
Expand Up @@ -22,6 +22,7 @@
import javax.servlet.WriteListener; import javax.servlet.WriteListener;


import org.apache.coyote.ContainerThreadMarker; import org.apache.coyote.ContainerThreadMarker;
import org.apache.coyote.Processor;
import org.apache.juli.logging.Log; import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory; import org.apache.juli.logging.LogFactory;
import org.apache.tomcat.util.ExceptionUtils; import org.apache.tomcat.util.ExceptionUtils;
Expand All @@ -35,6 +36,7 @@ public class UpgradeServletOutputStream extends ServletOutputStream {
private static final StringManager sm = private static final StringManager sm =
StringManager.getManager(UpgradeServletOutputStream.class); StringManager.getManager(UpgradeServletOutputStream.class);


private final Processor processor;
private final SocketWrapperBase<?> socketWrapper; private final SocketWrapperBase<?> socketWrapper;


// Used to ensure that isReady() and onWritePossible() have a consistent // Used to ensure that isReady() and onWritePossible() have a consistent
Expand All @@ -61,7 +63,8 @@ public class UpgradeServletOutputStream extends ServletOutputStream {
private volatile ClassLoader applicationLoader = null; private volatile ClassLoader applicationLoader = null;




public UpgradeServletOutputStream(SocketWrapperBase<?> socketWrapper) { public UpgradeServletOutputStream(Processor processor, SocketWrapperBase<?> socketWrapper) {
this.processor = processor;
this.socketWrapper = socketWrapper; this.socketWrapper = socketWrapper;
} }


Expand Down Expand Up @@ -115,7 +118,7 @@ public final void setWriteListener(WriteListener listener) {
registered = true; registered = true;
// Container is responsible for first call to onDataAvailable(). // Container is responsible for first call to onDataAvailable().
if (ContainerThreadMarker.isContainerThread()) { if (ContainerThreadMarker.isContainerThread()) {
socketWrapper.addDispatch(DispatchType.NON_BLOCKING_WRITE); processor.addDispatch(DispatchType.NON_BLOCKING_WRITE);
} else { } else {
socketWrapper.registerWriteInterest(); socketWrapper.registerWriteInterest();
} }
Expand Down
36 changes: 2 additions & 34 deletions java/org/apache/coyote/http2/StreamProcessor.java
Expand Up @@ -19,8 +19,6 @@
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Iterator; import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;


import javax.servlet.http.HttpUpgradeHandler; import javax.servlet.http.HttpUpgradeHandler;
Expand Down Expand Up @@ -48,7 +46,6 @@ public class StreamProcessor extends AbstractProcessor implements Runnable {
private static final StringManager sm = StringManager.getManager(StreamProcessor.class); private static final StringManager sm = StringManager.getManager(StreamProcessor.class);


private final Stream stream; private final Stream stream;
private Set<DispatchType> dispatches = new CopyOnWriteArraySet<>();


private volatile SSLSupport sslSupport; private volatile SSLSupport sslSupport;


Expand Down Expand Up @@ -374,11 +371,11 @@ public void action(ActionCode actionCode, Object param) {
break; break;
} }
case DISPATCH_READ: { case DISPATCH_READ: {
dispatches.add(DispatchType.NON_BLOCKING_READ); addDispatch(DispatchType.NON_BLOCKING_READ);
break; break;
} }
case DISPATCH_WRITE: { case DISPATCH_WRITE: {
dispatches.add(DispatchType.NON_BLOCKING_WRITE); addDispatch(DispatchType.NON_BLOCKING_WRITE);
break; break;
} }
case DISPATCH_EXECUTE: { case DISPATCH_EXECUTE: {
Expand Down Expand Up @@ -482,35 +479,6 @@ protected SocketState dispatchEndRequest() {
} }




public void addDispatch(DispatchType dispatchType) {
synchronized (dispatches) {
dispatches.add(dispatchType);
}
}
public Iterator<DispatchType> getIteratorAndClearDispatches() {
// Note: Logic in AbstractProtocol depends on this method only returning
// a non-null value if the iterator is non-empty. i.e. it should never
// return an empty iterator.
Iterator<DispatchType> result;
synchronized (dispatches) {
// Synchronized as the generation of the iterator and the clearing
// of dispatches needs to be an atomic operation.
result = dispatches.iterator();
if (result.hasNext()) {
dispatches.clear();
} else {
result = null;
}
}
return result;
}
public void clearDispatches() {
synchronized (dispatches) {
dispatches.clear();
}
}


@Override @Override
public HttpUpgradeHandler getHttpUpgradeHandler() { public HttpUpgradeHandler getHttpUpgradeHandler() {
// Should never happen // Should never happen
Expand Down
5 changes: 2 additions & 3 deletions java/org/apache/tomcat/util/net/AbstractEndpoint.java
Expand Up @@ -811,7 +811,8 @@ public abstract void processSocket(SocketWrapperBase<S> socketWrapper,
SocketStatus socketStatus, boolean dispatch); SocketStatus socketStatus, boolean dispatch);




public void executeNonBlockingDispatches(SocketWrapperBase<S> socketWrapper) { public void executeNonBlockingDispatches(SocketWrapperBase<S> socketWrapper,
Iterator<DispatchType> dispatches) {
/* /*
* This method is called when non-blocking IO is initiated by defining * This method is called when non-blocking IO is initiated by defining
* a read and/or write listener in a non-container thread. It is called * a read and/or write listener in a non-container thread. It is called
Expand All @@ -831,8 +832,6 @@ public void executeNonBlockingDispatches(SocketWrapperBase<S> socketWrapper) {
* sure that the socket has been added to the waitingRequests queue. * sure that the socket has been added to the waitingRequests queue.
*/ */
synchronized (socketWrapper) { synchronized (socketWrapper) {
Iterator<DispatchType> dispatches = socketWrapper.getIteratorAndClearDispatches();

while (dispatches != null && dispatches.hasNext()) { while (dispatches != null && dispatches.hasNext()) {
DispatchType dispatchType = dispatches.next(); DispatchType dispatchType = dispatches.next();
processSocket(socketWrapper, dispatchType.getSocketStatus(), false); processSocket(socketWrapper, dispatchType.getSocketStatus(), false);
Expand Down
36 changes: 2 additions & 34 deletions java/org/apache/tomcat/util/net/SocketWrapperBase.java
Expand Up @@ -20,8 +20,6 @@
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler; import java.nio.channels.CompletionHandler;
import java.util.Iterator; import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
Expand Down Expand Up @@ -98,8 +96,6 @@ public abstract class SocketWrapperBase<E> {
*/ */
protected int bufferedWriteSize = 64*1024; //64k default write buffer protected int bufferedWriteSize = 64*1024; //64k default write buffer


private Set<DispatchType> dispatches = new CopyOnWriteArraySet<>();

public SocketWrapperBase(E socket, AbstractEndpoint<E> endpoint) { public SocketWrapperBase(E socket, AbstractEndpoint<E> endpoint) {
this.socket = socket; this.socket = socket;
this.endpoint = endpoint; this.endpoint = endpoint;
Expand Down Expand Up @@ -298,34 +294,6 @@ public boolean canWrite() {
return socketBufferHandler.isWriteBufferWritable() && bufferedWrites.size() == 0; return socketBufferHandler.isWriteBufferWritable() && bufferedWrites.size() == 0;
} }


public void addDispatch(DispatchType dispatchType) {
synchronized (dispatches) {
dispatches.add(dispatchType);
}
}
public Iterator<DispatchType> getIteratorAndClearDispatches() {
// Note: Logic in AbstractProtocol depends on this method only returning
// a non-null value if the iterator is non-empty. i.e. it should never
// return an empty iterator.
Iterator<DispatchType> result;
synchronized (dispatches) {
// Synchronized as the generation of the iterator and the clearing
// of dispatches needs to be an atomic operation.
result = dispatches.iterator();
if (result.hasNext()) {
dispatches.clear();
} else {
result = null;
}
}
return result;
}
public void clearDispatches() {
synchronized (dispatches) {
dispatches.clear();
}
}



/** /**
* Overridden for debug purposes. No guarantees are made about the format of * Overridden for debug purposes. No guarantees are made about the format of
Expand Down Expand Up @@ -587,8 +555,8 @@ public void processSocket(SocketStatus socketStatus, boolean dispatch) {
} }




public void executeNonBlockingDispatches() { public void executeNonBlockingDispatches(Iterator<DispatchType> dispatches) {
endpoint.executeNonBlockingDispatches(this); endpoint.executeNonBlockingDispatches(this, dispatches);
} }




Expand Down

0 comments on commit c648adc

Please sign in to comment.