From b25bc1731d432c55b80fe2b4c1e015eb699f5258 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Wed, 18 Jan 2017 09:01:56 +0200 Subject: [PATCH 01/11] Add WsSession suspend/resume functionality --- java/org/apache/coyote/AbstractProtocol.java | 4 + .../tomcat/util/net/AbstractEndpoint.java | 2 +- .../tomcat/websocket/LocalStrings.properties | 4 + .../websocket/SuspendableMessageReceiver.java | 24 +++ .../apache/tomcat/websocket/WsFrameBase.java | 150 +++++++++++++++++- .../tomcat/websocket/WsFrameClient.java | 131 ++++++++++----- .../apache/tomcat/websocket/WsSession.java | 20 ++- .../websocket/server/LocalStrings.properties | 1 + .../websocket/server/WsFrameServer.java | 69 +++++++- .../server/WsHttpUpgradeHandler.java | 2 +- .../websocket/TestWsSessionSuspendResume.java | 144 +++++++++++++++++ 11 files changed, 507 insertions(+), 44 deletions(-) create mode 100644 java/org/apache/tomcat/websocket/SuspendableMessageReceiver.java create mode 100644 test/org/apache/tomcat/websocket/TestWsSessionSuspendResume.java diff --git a/java/org/apache/coyote/AbstractProtocol.java b/java/org/apache/coyote/AbstractProtocol.java index 1afd75e814d9..70f5216ffe4b 100644 --- a/java/org/apache/coyote/AbstractProtocol.java +++ b/java/org/apache/coyote/AbstractProtocol.java @@ -880,6 +880,10 @@ public SocketState process(SocketWrapperBase wrapper, SocketEvent status) { if (status != SocketEvent.OPEN_WRITE) { longPoll(wrapper, processor); } + } else if (state == SocketState.SUSPENDED) { + // Don't add sockets back to the poller. + // The resumeProcessing() method will add this socket + // to the poller. } else { // Connection closed. OK to recycle the processor. Upgrade // processors are not recycled. diff --git a/java/org/apache/tomcat/util/net/AbstractEndpoint.java b/java/org/apache/tomcat/util/net/AbstractEndpoint.java index ed086f50aac6..d739a2bc91d8 100644 --- a/java/org/apache/tomcat/util/net/AbstractEndpoint.java +++ b/java/org/apache/tomcat/util/net/AbstractEndpoint.java @@ -68,7 +68,7 @@ public static interface Handler { public enum SocketState { // TODO Add a new state to the AsyncStateMachine and remove // ASYNC_END (if possible) - OPEN, CLOSED, LONG, ASYNC_END, SENDFILE, UPGRADING, UPGRADED + OPEN, CLOSED, LONG, ASYNC_END, SENDFILE, UPGRADING, UPGRADED, SUSPENDED } diff --git a/java/org/apache/tomcat/websocket/LocalStrings.properties b/java/org/apache/tomcat/websocket/LocalStrings.properties index 7be9a5ad546e..67d66715033e 100644 --- a/java/org/apache/tomcat/websocket/LocalStrings.properties +++ b/java/org/apache/tomcat/websocket/LocalStrings.properties @@ -52,12 +52,15 @@ util.unknownDecoderType=The Decoder type [{0}] is not recognized # frames and therefore must be 123 bytes (not characters) or less in length. # Messages are encoded using UTF-8 where a single character may be encoded in # as many as 4 bytes. +wsFrame.alreadyResumed=Message receiving has already been resumed. +wsFrame.alreadySuspended=Message receiving has already been suspended. wsFrame.bufferTooSmall=No async message support and buffer too small. Buffer size: [{0}], Message size: [{1}] wsFrame.byteToLongFail=Too many bytes ([{0}]) were provided to be converted into a long wsFrame.closed=New frame received after a close control frame wsFrame.controlFragmented=A fragmented control frame was received but control frames may not be fragmented wsFrame.controlPayloadTooBig=A control frame was sent with a payload of size [{0}] which is larger than the maximum permitted of 125 bytes wsFrame.controlNoFin=A control frame was sent that did not have the fin bit set. Control frames are not permitted to use continuation frames. +wsFrame.illegalReadState=Unexpected read state [{0}] wsFrame.invalidOpCode= A WebSocket frame was sent with an unrecognised opCode of [{0}] wsFrame.invalidUtf8=A WebSocket text frame was received that could not be decoded to UTF-8 because it contained invalid byte sequences wsFrame.invalidUtf8Close=A WebSocket close frame was received with a close reason that contained invalid UTF-8 byte sequences @@ -68,6 +71,7 @@ wsFrame.notMasked=The client frame was not masked but all client frames must be wsFrame.oneByteCloseCode=The client sent a close frame with a single byte payload which is not valid wsFrame.partialHeaderComplete=WebSocket frame received. fin [{0}], rsv [{1}], OpCode [{2}], payload length [{3}] wsFrame.sessionClosed=The client data cannot be processed because the session has already been closed +wsFrame.suspendRequested=Suspend of the message receiving has already been requested. wsFrame.textMessageTooBig=The decoded text message was too big for the output buffer and the endpoint does not support partial messages wsFrame.wrongRsv=The client frame set the reserved bits to [{0}] for a message with opCode [{1}] which was not supported by this endpoint diff --git a/java/org/apache/tomcat/websocket/SuspendableMessageReceiver.java b/java/org/apache/tomcat/websocket/SuspendableMessageReceiver.java new file mode 100644 index 000000000000..fd0e971bb587 --- /dev/null +++ b/java/org/apache/tomcat/websocket/SuspendableMessageReceiver.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tomcat.websocket; + +public interface SuspendableMessageReceiver { + + void suspend(); + + void resume(); +} diff --git a/java/org/apache/tomcat/websocket/WsFrameBase.java b/java/org/apache/tomcat/websocket/WsFrameBase.java index 2495812e77d2..40bf023a6fcf 100644 --- a/java/org/apache/tomcat/websocket/WsFrameBase.java +++ b/java/org/apache/tomcat/websocket/WsFrameBase.java @@ -23,6 +23,7 @@ import java.nio.charset.CoderResult; import java.nio.charset.CodingErrorAction; import java.util.List; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import javax.websocket.CloseReason; import javax.websocket.CloseReason.CloseCodes; @@ -40,7 +41,7 @@ * extracts the messages. WebSocket Pings received will be responded to * automatically without any action required by the application. */ -public abstract class WsFrameBase { +public abstract class WsFrameBase implements SuspendableMessageReceiver { private static final StringManager sm = StringManager.getManager(WsFrameBase.class); @@ -84,11 +85,16 @@ public abstract class WsFrameBase { private volatile State state = State.NEW_FRAME; private volatile boolean open = true; + private static final AtomicReferenceFieldUpdater READ_STATE_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(WsFrameBase.class, ReadState.class, "readState"); + private volatile ReadState readState = ReadState.READY; + public WsFrameBase(WsSession wsSession, Transformation transformation) { inputBuffer = ByteBuffer.allocate(Constants.DEFAULT_BUFFER_SIZE); inputBuffer.position(0).limit(0); messageBufferBinary = ByteBuffer.allocate(wsSession.getMaxBinaryMessageBufferSize()); messageBufferText = CharBuffer.allocate(wsSession.getMaxTextMessageBufferSize()); + wsSession.setSuspendableMessageReceiver(this); this.wsSession = wsSession; Transformation finalTransformation; if (isMasked()) { @@ -106,7 +112,7 @@ public WsFrameBase(WsSession wsSession, Transformation transformation) { protected void processInputBuffer() throws IOException { - while (true) { + while (!isSuspended()) { wsSession.updateLastActive(); if (state == State.NEW_FRAME) { if (!processInitialHeader()) { @@ -687,6 +693,146 @@ private static enum State { } + /** + * READY - not suspended, waiting for notification for data available, + * socket registered to the poller (server case) + * READ - reading the available data, not suspended + * READ_SUSPENDING - suspended, finishing read operation + * READY_SUSPENDING - suspended, waiting for notification for data + * available, socket registered to the poller (server case) + * SUSPENDED - suspended, read operation finished/notification for data + * available received + * + *
+     *       resume                             resume
+     *       no action     data available       no action
+     *     |-------->READY<-------------->READ<--------|
+     *     |             ^  read finished              |
+     *  suspend          |                          suspend
+     *     |          resume                           |
+     *     |    register socket to poller (server)     |
+     *     |    resume data processing (client)        |
+     *     |             |                             |
+     *     v             |                             v
+     * READY_SUSPENDING  |                  READ_SUSPENDING
+     *     |             |                             |
+     * data available    |           read finished     |
+     *     |---------->SUSPENDED<----------------------|
+     * 
+ */ + protected enum ReadState { + READY (false), + READ (false), + READY_SUSPENDING(true), + READ_SUSPENDING (true), + SUSPENDED (true); + + private final boolean isSuspended; + + ReadState(boolean isSuspended) { + this.isSuspended = isSuspended; + } + + public boolean isSuspended() { + return isSuspended; + } + } + + @Override + public void suspend() { + while (true) { + switch (readState) { + case READY: + if (!READ_STATE_UPDATER.compareAndSet(this, ReadState.READY, + ReadState.READY_SUSPENDING)) { + continue; + } + return; + case READ: + if (!READ_STATE_UPDATER.compareAndSet(this, ReadState.READ, + ReadState.READ_SUSPENDING)) { + continue; + } + return; + case READY_SUSPENDING: + if (getLog().isWarnEnabled()) { + getLog().warn(sm.getString("wsFrame.suspendRequested")); + } + return; + case READ_SUSPENDING: + if (getLog().isWarnEnabled()) { + getLog().warn(sm.getString("wsFrame.suspendRequested")); + } + return; + case SUSPENDED: + if (getLog().isWarnEnabled()) { + getLog().warn(sm.getString("wsFrame.alreadySuspended")); + } + return; + default: + throw new IllegalStateException(sm.getString("wsFrame.illegalReadState", state)); + } + } + } + + @Override + public void resume() { + while (true) { + switch (readState) { + case READY: + if (getLog().isWarnEnabled()) { + getLog().warn(sm.getString("wsFrame.alreadyResumed")); + } + return; + case READ: + if (getLog().isWarnEnabled()) { + getLog().warn(sm.getString("wsFrame.alreadyResumed")); + } + return; + case READY_SUSPENDING: + if (!READ_STATE_UPDATER.compareAndSet(this, ReadState.READY_SUSPENDING, + ReadState.READY)) { + continue; + } + return; + case READ_SUSPENDING: + if (!READ_STATE_UPDATER.compareAndSet(this, ReadState.READ_SUSPENDING, + ReadState.READ)) { + continue; + } + return; + case SUSPENDED: + if (!READ_STATE_UPDATER.compareAndSet(this, ReadState.SUSPENDED, + ReadState.READY)) { + continue; + } + resumeProcessing(); + return; + default: + throw new IllegalStateException(sm.getString("wsFrame.illegalReadState", state)); + } + } + } + + protected boolean isSuspended() { + return readState.isSuspended(); + } + + protected ReadState getReadState() { + return readState; + } + + protected void changeReadState(ReadState newState) { + READ_STATE_UPDATER.set(this, newState); + } + + protected boolean changeReadState(ReadState oldState, ReadState newState) { + return READ_STATE_UPDATER.compareAndSet(this, oldState, newState); + } + + protected abstract void resumeProcessing(); + + private abstract class TerminalTransformation implements Transformation { @Override diff --git a/java/org/apache/tomcat/websocket/WsFrameClient.java b/java/org/apache/tomcat/websocket/WsFrameClient.java index 61a4185d849f..4fa2ac8dcf87 100644 --- a/java/org/apache/tomcat/websocket/WsFrameClient.java +++ b/java/org/apache/tomcat/websocket/WsFrameClient.java @@ -57,36 +57,60 @@ void startInputProcessing() { private void processSocketRead() throws IOException { + while (true) { + switch (getReadState()) { + case READY: + if (!changeReadState(ReadState.READY, ReadState.READ)) { + continue; + } + while (response.hasRemaining() && !isSuspended()) { + inputBuffer.mark(); + inputBuffer.position(inputBuffer.limit()).limit(inputBuffer.capacity()); - while (response.hasRemaining()) { - inputBuffer.mark(); - inputBuffer.position(inputBuffer.limit()).limit(inputBuffer.capacity()); - - int toCopy = Math.min(response.remaining(), inputBuffer.remaining()); + int toCopy = Math.min(response.remaining(), inputBuffer.remaining()); - // Copy remaining bytes read in HTTP phase to input buffer used by - // frame processing + // Copy remaining bytes read in HTTP phase to input buffer used by + // frame processing - int orgLimit = response.limit(); - response.limit(response.position() + toCopy); - inputBuffer.put(response); - response.limit(orgLimit); + int orgLimit = response.limit(); + response.limit(response.position() + toCopy); + inputBuffer.put(response); + response.limit(orgLimit); - inputBuffer.limit(inputBuffer.position()).reset(); + inputBuffer.limit(inputBuffer.position()).reset(); - // Process the data we have - processInputBuffer(); - } - response.clear(); + // Process the data we have + try { + processInputBuffer(); + } catch (IOException e) { + changeReadState(ReadState.READY); + throw e; + } + } + response.clear(); - // Get some more data - if (isOpen()) { - channel.read(response, null, handler); + // Get some more data + if (isOpen()) { + channel.read(response, null, handler); + } else { + changeReadState(ReadState.READY); + } + return; + case READY_SUSPENDING: + if (!changeReadState(ReadState.READY_SUSPENDING, ReadState.SUSPENDED)) { + continue; + } + return; + default: + throw new IllegalStateException( + sm.getString("wsFrameServer.illegalReadState", getReadState())); + } } } private final void close(Throwable t) { + changeReadState(ReadState.READY); CloseReason cr; if (t instanceof WsIOException) { cr = ((WsIOException) t).getCloseReason(); @@ -129,19 +153,7 @@ public void completed(Integer result, Void attachment) { return; } response.flip(); - try { - processSocketRead(); - } catch (IOException e) { - // Only send a close message on an IOException if the client - // has not yet received a close control message from the server - // as the IOException may be in response to the client - // continuing to send a message after the server sent a close - // control message. - if (isOpen()) { - log.debug(sm.getString("wsFrameClient.ioe"), e); - close(e); - } - } + doResumeProcessing(true); } @Override @@ -151,14 +163,59 @@ public void failed(Throwable exc, Void attachment) { response = ByteBuffer .allocate(((ReadBufferOverflowException) exc).getMinBufferSize()); response.flip(); - try { - processSocketRead(); - } catch (IOException e) { - close(e); - } + doResumeProcessing(false); } else { close(exc); } } + + private void doResumeProcessing(boolean checkOpenOnError) { + while (true) { + switch (getReadState()) { + case READ: + if (!changeReadState(ReadState.READ, ReadState.READY)) { + continue; + } + resumeProcessing(checkOpenOnError); + return; + case READ_SUSPENDING: + if (!changeReadState(ReadState.READ_SUSPENDING, ReadState.SUSPENDED)) { + continue; + } + return; + default: + throw new IllegalStateException( + sm.getString("wsFrame.illegalReadState", getReadState())); + } + } + } + } + + + @Override + protected void resumeProcessing() { + resumeProcessing(true); + } + + private void resumeProcessing(boolean checkOpenOnError) { + try { + processSocketRead(); + } catch (IOException e) { + if (checkOpenOnError) { + // Only send a close message on an IOException if the client + // has not yet received a close control message from the server + // as the IOException may be in response to the client + // continuing to send a message after the server sent a close + // control message. + if (isOpen()) { + if (log.isDebugEnabled()) { + log.debug(sm.getString("wsFrameClient.ioe"), e); + } + close(e); + } + } else { + close(e); + } + } } } diff --git a/java/org/apache/tomcat/websocket/WsSession.java b/java/org/apache/tomcat/websocket/WsSession.java index 766d35928f8a..4da927b92667 100644 --- a/java/org/apache/tomcat/websocket/WsSession.java +++ b/java/org/apache/tomcat/websocket/WsSession.java @@ -53,7 +53,7 @@ import org.apache.tomcat.util.ExceptionUtils; import org.apache.tomcat.util.res.StringManager; -public class WsSession implements Session { +public class WsSession implements Session, SuspendableMessageReceiver { // An ellipsis is a single character that looks like three periods in a row // and is used to indicate a continuation. @@ -792,4 +792,22 @@ private static enum State { OUTPUT_CLOSED, CLOSED } + + + private SuspendableMessageReceiver suspendableMessageReceiver; + void setSuspendableMessageReceiver(SuspendableMessageReceiver suspendableMessageReceiver) { + this.suspendableMessageReceiver = suspendableMessageReceiver; + } + + + @Override + public void suspend() { + suspendableMessageReceiver.suspend(); + } + + + @Override + public void resume() { + suspendableMessageReceiver.resume(); + } } diff --git a/java/org/apache/tomcat/websocket/server/LocalStrings.properties b/java/org/apache/tomcat/websocket/server/LocalStrings.properties index ef55abeae0aa..ca425e2d5db3 100644 --- a/java/org/apache/tomcat/websocket/server/LocalStrings.properties +++ b/java/org/apache/tomcat/websocket/server/LocalStrings.properties @@ -31,6 +31,7 @@ uriTemplate.invalidPath=The path [{0}] is not valid. uriTemplate.invalidSegment=The segment [{0}] is not valid in the provided path [{1}] wsFrameServer.bytesRead=Read [{0}] bytes into input buffer ready for processing +wsFrameServer.illegalReadState=Unexpected read state [{0}] wsFrameServer.onDataAvailable=Method entry wsHttpUpgradeHandler.closeOnError=Closing WebSocket connection due to an error diff --git a/java/org/apache/tomcat/websocket/server/WsFrameServer.java b/java/org/apache/tomcat/websocket/server/WsFrameServer.java index ceabbdb29ae6..aa9862f95958 100644 --- a/java/org/apache/tomcat/websocket/server/WsFrameServer.java +++ b/java/org/apache/tomcat/websocket/server/WsFrameServer.java @@ -22,6 +22,8 @@ import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; +import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState; +import org.apache.tomcat.util.net.SocketEvent; import org.apache.tomcat.util.net.SocketWrapperBase; import org.apache.tomcat.util.res.StringManager; import org.apache.tomcat.websocket.Transformation; @@ -52,11 +54,18 @@ public WsFrameServer(SocketWrapperBase socketWrapper, WsSession wsSession, * @throws IOException if an I/O error occurs while processing the available * data */ - public void onDataAvailable() throws IOException { + private void onDataAvailable() throws IOException { if (log.isDebugEnabled()) { log.debug("wsFrameServer.onDataAvailable"); } - while (isOpen()) { + if (isOpen() && inputBuffer.hasRemaining() && !isSuspended()) { + // There might be a data that was left in the buffer when + // the read has been suspended. + // Consume this data before reading from the socket. + processInputBuffer(); + } + + while (isOpen() && !isSuspended()) { // Fill up the input buffer with as much data as we can inputBuffer.mark(); inputBuffer.position(inputBuffer.limit()).limit(inputBuffer.capacity()); @@ -124,4 +133,60 @@ protected void sendMessageBinary(ByteBuffer msg, boolean last) throws WsIOExcept Thread.currentThread().setContextClassLoader(cl); } } + + + @Override + protected void resumeProcessing() { + socketWrapper.processSocket(SocketEvent.OPEN_READ, true); + } + + SocketState notifyDataAvailable() throws IOException { + while (isOpen()) { + switch (getReadState()) { + case READY: + if (!changeReadState(ReadState.READY, ReadState.READ)) { + continue; + } + try { + return doOnDataAvailable(); + } catch (IOException e) { + changeReadState(ReadState.READY); + throw e; + } + case READY_SUSPENDING: + if (!changeReadState(ReadState.READY_SUSPENDING, ReadState.SUSPENDED)) { + continue; + } + return SocketState.SUSPENDED; + default: + throw new IllegalStateException( + sm.getString("wsFrameServer.illegalReadState", getReadState())); + } + } + + return SocketState.CLOSED; + } + + private SocketState doOnDataAvailable() throws IOException { + onDataAvailable(); + while (isOpen()) { + switch (getReadState()) { + case READ: + if (!changeReadState(ReadState.READ, ReadState.READY)) { + continue; + } + return SocketState.UPGRADED; + case READ_SUSPENDING: + if (!changeReadState(ReadState.READ_SUSPENDING, ReadState.SUSPENDED)) { + continue; + } + return SocketState.SUSPENDED; + default: + throw new IllegalStateException( + sm.getString("wsFrameServer.illegalReadState", getReadState())); + } + } + + return SocketState.CLOSED; + } } diff --git a/java/org/apache/tomcat/websocket/server/WsHttpUpgradeHandler.java b/java/org/apache/tomcat/websocket/server/WsHttpUpgradeHandler.java index 4b0622cd2007..c49b32ad2bd0 100644 --- a/java/org/apache/tomcat/websocket/server/WsHttpUpgradeHandler.java +++ b/java/org/apache/tomcat/websocket/server/WsHttpUpgradeHandler.java @@ -145,7 +145,7 @@ public SocketState upgradeDispatch(SocketEvent status) { switch (status) { case OPEN_READ: try { - wsFrame.onDataAvailable(); + return wsFrame.notifyDataAvailable(); } catch (WsIOException ws) { close(ws.getCloseReason()); } catch (IOException ioe) { diff --git a/test/org/apache/tomcat/websocket/TestWsSessionSuspendResume.java b/test/org/apache/tomcat/websocket/TestWsSessionSuspendResume.java new file mode 100644 index 000000000000..3d11ae52efc1 --- /dev/null +++ b/test/org/apache/tomcat/websocket/TestWsSessionSuspendResume.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tomcat.websocket; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.websocket.ClientEndpointConfig; +import javax.websocket.CloseReason; +import javax.websocket.ContainerProvider; +import javax.websocket.Endpoint; +import javax.websocket.EndpointConfig; +import javax.websocket.Session; +import javax.websocket.WebSocketContainer; +import javax.websocket.server.ServerEndpointConfig; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.catalina.Context; +import org.apache.catalina.servlets.DefaultServlet; +import org.apache.catalina.startup.Tomcat; +import org.apache.tomcat.websocket.TesterMessageCountClient.TesterProgrammaticEndpoint; +import org.apache.tomcat.websocket.server.TesterEndpointConfig; + +public class TestWsSessionSuspendResume extends WebSocketBaseTest { + + @Test + public void test() throws Exception { + Tomcat tomcat = getTomcatInstance(); + + Context ctx = tomcat.addContext("", null); + ctx.addApplicationListener(Config.class.getName()); + + Tomcat.addServlet(ctx, "default", new DefaultServlet()); + ctx.addServletMappingDecoded("/", "default"); + + tomcat.start(); + + WebSocketContainer wsContainer = ContainerProvider.getWebSocketContainer(); + + ClientEndpointConfig clientEndpointConfig = ClientEndpointConfig.Builder.create().build(); + Session wsSession = wsContainer.connectToServer( + TesterProgrammaticEndpoint.class, + clientEndpointConfig, + new URI("ws://localhost:" + getPort() + Config.PATH)); + + CountDownLatch latch = new CountDownLatch(2); + wsSession.addMessageHandler(String.class, message -> { + Assert.assertTrue("[echo, echo, echo]".equals(message)); + latch.countDown(); + }); + for (int i = 0; i < 8; i++) { + wsSession.getAsyncRemote().sendText("echo"); + } + + boolean latchResult = latch.await(30, TimeUnit.SECONDS); + Assert.assertTrue(latchResult); + + wsSession.close(); + } + + + public static final class Config extends TesterEndpointConfig { + private static final String PATH = "/echo"; + + @Override + protected Class getEndpointClass() { + return SuspendResumeEndpoint.class; + } + + @Override + protected ServerEndpointConfig getServerEndpointConfig() { + return ServerEndpointConfig.Builder.create(getEndpointClass(), PATH).build(); + } + } + + + public static final class SuspendResumeEndpoint extends Endpoint { + + @Override + public void onOpen(Session session, EndpointConfig epc) { + MessageProcessor processor = new MessageProcessor(session, 3); + session.addMessageHandler(String.class, message -> processor.addMessage(message)); + } + + @Override + public void onClose(Session session, CloseReason closeReason) { + try { + session.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Override + public void onError(Session session, Throwable t) { + t.printStackTrace(); + } + } + + + private static final class MessageProcessor { + private final Session session; + private final int count; + private final List messages = new ArrayList<>(); + + MessageProcessor(Session session, int count) { + this.session = session; + this.count = count; + } + + void addMessage(String message) { + if (messages.size() == count) { + ((SuspendableMessageReceiver) session).suspend(); + session.getAsyncRemote().sendText(messages.toString(), result -> { + ((SuspendableMessageReceiver) session).resume(); + Assert.assertTrue(result.isOK()); + }); + messages.clear(); + } else { + messages.add(message); + } + } + } +} \ No newline at end of file From 58a4ead199cd9889b62dbe3bad075a42b7065557 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Tue, 25 Apr 2017 10:16:05 +0300 Subject: [PATCH 02/11] Ensure the response data will not be lost when the read is suspended (client use case) --- .../org/apache/tomcat/websocket/WsFrameClient.java | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/java/org/apache/tomcat/websocket/WsFrameClient.java b/java/org/apache/tomcat/websocket/WsFrameClient.java index 4fa2ac8dcf87..9654c921c472 100644 --- a/java/org/apache/tomcat/websocket/WsFrameClient.java +++ b/java/org/apache/tomcat/websocket/WsFrameClient.java @@ -63,7 +63,19 @@ private void processSocketRead() throws IOException { if (!changeReadState(ReadState.READY, ReadState.READ)) { continue; } - while (response.hasRemaining() && !isSuspended()) { + while (response.hasRemaining()) { + if (isSuspended()) { + if (!changeReadState(ReadState.READ_SUSPENDING, ReadState.SUSPENDED)) { + continue; + } + // There is still data available in the response buffer + // Return here so that the response buffer will not be + // cleared and there will be no data read from the + // socket. Thus when the read operation is resumed first + // the data left in the response buffer will be consumed + // and then a new socket read will be performed + return; + } inputBuffer.mark(); inputBuffer.position(inputBuffer.limit()).limit(inputBuffer.capacity()); From a6e79d9e997c5c3091ac4226ddb282be701c5179 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Tue, 25 Apr 2017 12:16:22 +0300 Subject: [PATCH 03/11] Check in all switch cases whether the state is the expected one (for methods suspend() and resume()) --- .../apache/tomcat/websocket/WsFrameBase.java | 40 ++++++++++++++----- 1 file changed, 30 insertions(+), 10 deletions(-) diff --git a/java/org/apache/tomcat/websocket/WsFrameBase.java b/java/org/apache/tomcat/websocket/WsFrameBase.java index 40bf023a6fcf..75dd8d6951b2 100644 --- a/java/org/apache/tomcat/websocket/WsFrameBase.java +++ b/java/org/apache/tomcat/websocket/WsFrameBase.java @@ -755,18 +755,30 @@ public void suspend() { } return; case READY_SUSPENDING: - if (getLog().isWarnEnabled()) { - getLog().warn(sm.getString("wsFrame.suspendRequested")); + if (readState != ReadState.READY_SUSPENDING) { + continue; + } else { + if (getLog().isWarnEnabled()) { + getLog().warn(sm.getString("wsFrame.suspendRequested")); + } } return; case READ_SUSPENDING: - if (getLog().isWarnEnabled()) { - getLog().warn(sm.getString("wsFrame.suspendRequested")); + if (readState != ReadState.READ_SUSPENDING) { + continue; + } else { + if (getLog().isWarnEnabled()) { + getLog().warn(sm.getString("wsFrame.suspendRequested")); + } } return; case SUSPENDED: - if (getLog().isWarnEnabled()) { - getLog().warn(sm.getString("wsFrame.alreadySuspended")); + if (readState != ReadState.SUSPENDED) { + continue; + } else { + if (getLog().isWarnEnabled()) { + getLog().warn(sm.getString("wsFrame.alreadySuspended")); + } } return; default: @@ -780,13 +792,21 @@ public void resume() { while (true) { switch (readState) { case READY: - if (getLog().isWarnEnabled()) { - getLog().warn(sm.getString("wsFrame.alreadyResumed")); + if (readState != ReadState.READY) { + continue; + } else { + if (getLog().isWarnEnabled()) { + getLog().warn(sm.getString("wsFrame.alreadyResumed")); + } } return; case READ: - if (getLog().isWarnEnabled()) { - getLog().warn(sm.getString("wsFrame.alreadyResumed")); + if (readState != ReadState.READ) { + continue; + } else { + if (getLog().isWarnEnabled()) { + getLog().warn(sm.getString("wsFrame.alreadyResumed")); + } } return; case READY_SUSPENDING: From 21d7b4f2294db094bf36bb61ea48e6434b49f059 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Tue, 25 Apr 2017 12:22:13 +0300 Subject: [PATCH 04/11] Add javadoc --- java/org/apache/tomcat/websocket/WsFrameBase.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/java/org/apache/tomcat/websocket/WsFrameBase.java b/java/org/apache/tomcat/websocket/WsFrameBase.java index 75dd8d6951b2..5499751b19d4 100644 --- a/java/org/apache/tomcat/websocket/WsFrameBase.java +++ b/java/org/apache/tomcat/websocket/WsFrameBase.java @@ -850,6 +850,13 @@ protected boolean changeReadState(ReadState oldState, ReadState newState) { return READ_STATE_UPDATER.compareAndSet(this, oldState, newState); } + /** + * This method will be invoked when the read operation is resumed. + * As the suspend of the read operation can be invoked at any time, when + * implementing this method one should consider that there might still be + * data remaining into the internal buffers that needs to be processed + * before reading again from the socket. + */ protected abstract void resumeProcessing(); From d92257ef79fb52ae37b3c7c94ae1510824eff615 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Tue, 25 Apr 2017 13:40:26 +0300 Subject: [PATCH 05/11] Remove SuspendableMessageReceiver interface --- .../websocket/SuspendableMessageReceiver.java | 24 ------------------- .../apache/tomcat/websocket/WsFrameBase.java | 6 ++--- .../apache/tomcat/websocket/WsSession.java | 14 +++++------ .../websocket/TestWsSessionSuspendResume.java | 4 ++-- 4 files changed, 10 insertions(+), 38 deletions(-) delete mode 100644 java/org/apache/tomcat/websocket/SuspendableMessageReceiver.java diff --git a/java/org/apache/tomcat/websocket/SuspendableMessageReceiver.java b/java/org/apache/tomcat/websocket/SuspendableMessageReceiver.java deleted file mode 100644 index fd0e971bb587..000000000000 --- a/java/org/apache/tomcat/websocket/SuspendableMessageReceiver.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.tomcat.websocket; - -public interface SuspendableMessageReceiver { - - void suspend(); - - void resume(); -} diff --git a/java/org/apache/tomcat/websocket/WsFrameBase.java b/java/org/apache/tomcat/websocket/WsFrameBase.java index 5499751b19d4..08ed3c290240 100644 --- a/java/org/apache/tomcat/websocket/WsFrameBase.java +++ b/java/org/apache/tomcat/websocket/WsFrameBase.java @@ -41,7 +41,7 @@ * extracts the messages. WebSocket Pings received will be responded to * automatically without any action required by the application. */ -public abstract class WsFrameBase implements SuspendableMessageReceiver { +public abstract class WsFrameBase { private static final StringManager sm = StringManager.getManager(WsFrameBase.class); @@ -94,7 +94,7 @@ public WsFrameBase(WsSession wsSession, Transformation transformation) { inputBuffer.position(0).limit(0); messageBufferBinary = ByteBuffer.allocate(wsSession.getMaxBinaryMessageBufferSize()); messageBufferText = CharBuffer.allocate(wsSession.getMaxTextMessageBufferSize()); - wsSession.setSuspendableMessageReceiver(this); + wsSession.setWsFrame(this); this.wsSession = wsSession; Transformation finalTransformation; if (isMasked()) { @@ -738,7 +738,6 @@ public boolean isSuspended() { } } - @Override public void suspend() { while (true) { switch (readState) { @@ -787,7 +786,6 @@ public void suspend() { } } - @Override public void resume() { while (true) { switch (readState) { diff --git a/java/org/apache/tomcat/websocket/WsSession.java b/java/org/apache/tomcat/websocket/WsSession.java index 4da927b92667..a76ae55d5d45 100644 --- a/java/org/apache/tomcat/websocket/WsSession.java +++ b/java/org/apache/tomcat/websocket/WsSession.java @@ -53,7 +53,7 @@ import org.apache.tomcat.util.ExceptionUtils; import org.apache.tomcat.util.res.StringManager; -public class WsSession implements Session, SuspendableMessageReceiver { +public class WsSession implements Session { // An ellipsis is a single character that looks like three periods in a row // and is used to indicate a continuation. @@ -794,20 +794,18 @@ private static enum State { } - private SuspendableMessageReceiver suspendableMessageReceiver; - void setSuspendableMessageReceiver(SuspendableMessageReceiver suspendableMessageReceiver) { - this.suspendableMessageReceiver = suspendableMessageReceiver; + private WsFrameBase wsFrame; + void setWsFrame(WsFrameBase wsFrame) { + this.wsFrame = wsFrame; } - @Override public void suspend() { - suspendableMessageReceiver.suspend(); + wsFrame.suspend(); } - @Override public void resume() { - suspendableMessageReceiver.resume(); + wsFrame.resume(); } } diff --git a/test/org/apache/tomcat/websocket/TestWsSessionSuspendResume.java b/test/org/apache/tomcat/websocket/TestWsSessionSuspendResume.java index 3d11ae52efc1..8af867982003 100644 --- a/test/org/apache/tomcat/websocket/TestWsSessionSuspendResume.java +++ b/test/org/apache/tomcat/websocket/TestWsSessionSuspendResume.java @@ -130,9 +130,9 @@ private static final class MessageProcessor { void addMessage(String message) { if (messages.size() == count) { - ((SuspendableMessageReceiver) session).suspend(); + ((WsSession) session).suspend(); session.getAsyncRemote().sendText(messages.toString(), result -> { - ((SuspendableMessageReceiver) session).resume(); + ((WsSession) session).resume(); Assert.assertTrue(result.isOK()); }); messages.clear(); From 4d9dd25ae652d6822e1b279d2916dc5217bae414 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Thu, 27 Apr 2017 21:20:16 +0300 Subject: [PATCH 06/11] ws police --- java/org/apache/tomcat/websocket/WsFrameBase.java | 2 +- java/org/apache/tomcat/websocket/WsFrameClient.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/java/org/apache/tomcat/websocket/WsFrameBase.java b/java/org/apache/tomcat/websocket/WsFrameBase.java index 08ed3c290240..27bbc03718ed 100644 --- a/java/org/apache/tomcat/websocket/WsFrameBase.java +++ b/java/org/apache/tomcat/websocket/WsFrameBase.java @@ -702,7 +702,7 @@ private static enum State { * available, socket registered to the poller (server case) * SUSPENDED - suspended, read operation finished/notification for data * available received - * + * *
      *       resume                             resume
      *       no action     data available       no action
diff --git a/java/org/apache/tomcat/websocket/WsFrameClient.java b/java/org/apache/tomcat/websocket/WsFrameClient.java
index 9654c921c472..62391b9d3a3e 100644
--- a/java/org/apache/tomcat/websocket/WsFrameClient.java
+++ b/java/org/apache/tomcat/websocket/WsFrameClient.java
@@ -224,7 +224,7 @@ private void resumeProcessing(boolean checkOpenOnError) {
                         log.debug(sm.getString("wsFrameClient.ioe"), e);
                     }
                     close(e);
-                } 
+                }
             } else {
                 close(e);
             }

From 031a611e9ee179ebd0b63c18d811c7af34b19394 Mon Sep 17 00:00:00 2001
From: Violeta Georgieva 
Date: Thu, 27 Apr 2017 21:42:20 +0300
Subject: [PATCH 07/11] Use separate lines for transitions instead of
 bi-directional

---
 .../apache/tomcat/websocket/WsFrameBase.java  | 24 +++++++++++--------
 1 file changed, 14 insertions(+), 10 deletions(-)

diff --git a/java/org/apache/tomcat/websocket/WsFrameBase.java b/java/org/apache/tomcat/websocket/WsFrameBase.java
index 27bbc03718ed..6443c1111191 100644
--- a/java/org/apache/tomcat/websocket/WsFrameBase.java
+++ b/java/org/apache/tomcat/websocket/WsFrameBase.java
@@ -704,16 +704,20 @@ private static enum State {
      * available received
      *
      * 
-     *       resume                             resume
-     *       no action     data available       no action
-     *     |-------->READY<-------------->READ<--------|
-     *     |             ^  read finished              |
-     *  suspend          |                          suspend
-     *     |          resume                           |
-     *     |    register socket to poller (server)     |
-     *     |    resume data processing (client)        |
-     *     |             |                             |
-     *     v             |                             v
+     *     resume                             resume
+     *     no action        data available    no action
+     *  |---------------|  |--------------| |----------|
+     *  |               v  |              v v          |
+     *  |  |-----------READY<-------------READ------|  |
+     *  |  |             ^   read finished          |  |
+     *  |  |             |                          |  |
+     *  | suspend        |                     suspend |
+     *  |  |             |                          |  |
+     *  |  |          resume                        |  |
+     *  |  |    register socket to poller (server)  |  |
+     *  |  |    resume data processing (client)     |  |
+     *  |  |             |                          |  |
+     *  |  v             |                          v  |
      * READY_SUSPENDING  |                  READ_SUSPENDING
      *     |             |                             |
      * data available    |           read finished     |

From 41670096008697abe57fccc8ae26f4c89e7f7666 Mon Sep 17 00:00:00 2001
From: Violeta Georgieva 
Date: Thu, 27 Apr 2017 22:09:06 +0300
Subject: [PATCH 08/11] It is not needed to change the state as the close
 method will do it

---
 java/org/apache/tomcat/websocket/WsFrameClient.java | 7 +------
 1 file changed, 1 insertion(+), 6 deletions(-)

diff --git a/java/org/apache/tomcat/websocket/WsFrameClient.java b/java/org/apache/tomcat/websocket/WsFrameClient.java
index 62391b9d3a3e..846bde177a1e 100644
--- a/java/org/apache/tomcat/websocket/WsFrameClient.java
+++ b/java/org/apache/tomcat/websocket/WsFrameClient.java
@@ -92,12 +92,7 @@ private void processSocketRead() throws IOException {
                     inputBuffer.limit(inputBuffer.position()).reset();
 
                     // Process the data we have
-                    try {
-                        processInputBuffer();
-                    } catch (IOException e) {
-                        changeReadState(ReadState.READY);
-                        throw e;
-                    }
+                    processInputBuffer();
                 }
                 response.clear();
 

From e5a400aea358c519702461ca64064cc4101f4767 Mon Sep 17 00:00:00 2001
From: Violeta Georgieva 
Date: Fri, 28 Apr 2017 00:31:19 +0300
Subject: [PATCH 09/11] Document the states in the state diagram

---
 .../apache/tomcat/websocket/WsFrameBase.java  | 50 +++++++++++++------
 1 file changed, 35 insertions(+), 15 deletions(-)

diff --git a/java/org/apache/tomcat/websocket/WsFrameBase.java b/java/org/apache/tomcat/websocket/WsFrameBase.java
index 6443c1111191..6828e6f343b2 100644
--- a/java/org/apache/tomcat/websocket/WsFrameBase.java
+++ b/java/org/apache/tomcat/websocket/WsFrameBase.java
@@ -694,20 +694,40 @@ private static enum State {
 
 
     /**
-     * READY - not suspended, waiting for notification for data available,
-     * socket registered to the poller (server case)
-     * READ - reading the available data, not suspended
-     * READ_SUSPENDING - suspended, finishing read operation
-     * READY_SUSPENDING - suspended, waiting for notification for data
-     * available, socket registered to the poller (server case)
-     * SUSPENDED - suspended, read operation finished/notification for data
-     * available received
+     * READY            - not suspended
+     *                    Server case: waiting for a notification that data is
+     *                    ready to be read from the socket, socket registered
+     *                    to the poller
+     *                    Client case: data has been read from the socket and
+     *                    is available for processing
+     * READ             - not suspended
+     *                    Server case: reading from the socket and processing
+     *                    data
+     *                    Client case: processing the data if such has already
+     *                    been read and more data will be read from the socket
+     * READ_SUSPENDING  - suspended, a call to suspend() was made while in READ
+     *                    state. A call to resume() will do nothing and will
+     *                    transition to READ state.
+     * READY_SUSPENDING - suspended, a call to suspend() was made while in READY
+     *                    state. A call to resume() will do nothing and will
+     *                    transition to READY state.
+     * SUSPENDED        - suspended
+     *                    Server case: read finished (READ_SUSPENDING) /
+     *                    a notification was received that data is ready to be
+     *                    read from the socket (READY_SUSPENDING), socket is
+     *                    not registered to the poller
+     *                    Client case: read finished (READ_SUSPENDING) / data
+     *                    has been read from the socket and is available for
+     *                    processing (READY_SUSPENDING)
+     *                    A call to resume() will:
+     *                    Server case: register the socket to the poller
+     *                    Client case: resume data processing
      *
      * 
-     *     resume                             resume
-     *     no action        data available    no action
-     *  |---------------|  |--------------| |----------|
-     *  |               v  |              v v          |
+     *     resume           data to be        resume
+     *     no action        processed         no action
+     *  |---------------| |---------------| |----------|
+     *  |               v |               v v          |
      *  |  |-----------READY<-------------READ------|  |
      *  |  |             ^   read finished          |  |
      *  |  |             |                          |  |
@@ -719,9 +739,9 @@ private static enum State {
      *  |  |             |                          |  |
      *  |  v             |                          v  |
      * READY_SUSPENDING  |                  READ_SUSPENDING
-     *     |             |                             |
-     * data available    |           read finished     |
-     *     |---------->SUSPENDED<----------------------|
+     *  |                |                             |
+     *  | data available |           read finished     |
+     *  |------------->SUSPENDED<----------------------|
      * 
*/ protected enum ReadState { From 120bf7a74c36ee704ac62d17a6a50c90fec11b08 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Fri, 28 Apr 2017 09:52:06 +0300 Subject: [PATCH 10/11] Introduce CLOSING state to indicate that close message will be send and suspend/resume are not possible anymore. --- java/org/apache/tomcat/websocket/WsFrameBase.java | 8 +++++++- java/org/apache/tomcat/websocket/WsFrameClient.java | 4 ++-- .../org/apache/tomcat/websocket/server/WsFrameServer.java | 2 +- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/java/org/apache/tomcat/websocket/WsFrameBase.java b/java/org/apache/tomcat/websocket/WsFrameBase.java index 6828e6f343b2..c218fe4a0978 100644 --- a/java/org/apache/tomcat/websocket/WsFrameBase.java +++ b/java/org/apache/tomcat/websocket/WsFrameBase.java @@ -722,6 +722,7 @@ private static enum State { * A call to resume() will: * Server case: register the socket to the poller * Client case: resume data processing + * CLOSING - not suspended, a close will be send * *
      *     resume           data to be        resume
@@ -749,7 +750,8 @@ protected enum ReadState {
         READ            (false),
         READY_SUSPENDING(true),
         READ_SUSPENDING (true),
-        SUSPENDED       (true);
+        SUSPENDED       (true),
+        CLOSING         (false);
 
         private final boolean isSuspended;
 
@@ -804,6 +806,8 @@ public void suspend() {
                     }
                 }
                 return;
+            case CLOSING:
+                return;
             default:
                 throw new IllegalStateException(sm.getString("wsFrame.illegalReadState", state));
             }
@@ -850,6 +854,8 @@ public void resume() {
                 }
                 resumeProcessing();
                 return;
+            case CLOSING:
+                return;
             default:
                 throw new IllegalStateException(sm.getString("wsFrame.illegalReadState", state));
             }
diff --git a/java/org/apache/tomcat/websocket/WsFrameClient.java b/java/org/apache/tomcat/websocket/WsFrameClient.java
index 846bde177a1e..a400a8ccf4ed 100644
--- a/java/org/apache/tomcat/websocket/WsFrameClient.java
+++ b/java/org/apache/tomcat/websocket/WsFrameClient.java
@@ -100,7 +100,7 @@ private void processSocketRead() throws IOException {
                 if (isOpen()) {
                     channel.read(response, null, handler);
                 } else {
-                    changeReadState(ReadState.READY);
+                    changeReadState(ReadState.CLOSING);
                 }
                 return;
             case READY_SUSPENDING:
@@ -117,7 +117,7 @@ private void processSocketRead() throws IOException {
 
 
     private final void close(Throwable t) {
-        changeReadState(ReadState.READY);
+        changeReadState(ReadState.CLOSING);
         CloseReason cr;
         if (t instanceof WsIOException) {
             cr = ((WsIOException) t).getCloseReason();
diff --git a/java/org/apache/tomcat/websocket/server/WsFrameServer.java b/java/org/apache/tomcat/websocket/server/WsFrameServer.java
index aa9862f95958..897610a9daae 100644
--- a/java/org/apache/tomcat/websocket/server/WsFrameServer.java
+++ b/java/org/apache/tomcat/websocket/server/WsFrameServer.java
@@ -150,7 +150,7 @@ SocketState notifyDataAvailable() throws IOException {
                 try {
                     return doOnDataAvailable();
                 } catch (IOException e) {
-                    changeReadState(ReadState.READY);
+                    changeReadState(ReadState.CLOSING);
                     throw e;
                 }
             case READY_SUSPENDING:

From 4ddcaa2cd8a966e2b1081c4f5d9910d2afe38f03 Mon Sep 17 00:00:00 2001
From: Violeta Georgieva 
Date: Fri, 28 Apr 2017 21:20:56 +0300
Subject: [PATCH 11/11] Rename states READY -> WAITING READ -> PROCESSING
 READY_SUSPENDING -> SUSPENDING_WAIT READ_SUSPENDING -> SUSPENDING_PROCESSING

---
 .../apache/tomcat/websocket/WsFrameBase.java  | 126 +++++++++---------
 .../tomcat/websocket/WsFrameClient.java       |  18 +--
 .../websocket/server/WsFrameServer.java       |  16 +--
 3 files changed, 82 insertions(+), 78 deletions(-)

diff --git a/java/org/apache/tomcat/websocket/WsFrameBase.java b/java/org/apache/tomcat/websocket/WsFrameBase.java
index c218fe4a0978..c2189b4c5980 100644
--- a/java/org/apache/tomcat/websocket/WsFrameBase.java
+++ b/java/org/apache/tomcat/websocket/WsFrameBase.java
@@ -87,7 +87,7 @@ public abstract class WsFrameBase {
 
     private static final AtomicReferenceFieldUpdater READ_STATE_UPDATER =
             AtomicReferenceFieldUpdater.newUpdater(WsFrameBase.class, ReadState.class, "readState");
-    private volatile ReadState readState = ReadState.READY;
+    private volatile ReadState readState = ReadState.WAITING;
 
     public WsFrameBase(WsSession wsSession, Transformation transformation) {
         inputBuffer = ByteBuffer.allocate(Constants.DEFAULT_BUFFER_SIZE);
@@ -694,43 +694,47 @@ private static enum State {
 
 
     /**
-     * READY            - not suspended
-     *                    Server case: waiting for a notification that data is
-     *                    ready to be read from the socket, socket registered
-     *                    to the poller
-     *                    Client case: data has been read from the socket and
-     *                    is available for processing
-     * READ             - not suspended
-     *                    Server case: reading from the socket and processing
-     *                    data
-     *                    Client case: processing the data if such has already
-     *                    been read and more data will be read from the socket
-     * READ_SUSPENDING  - suspended, a call to suspend() was made while in READ
-     *                    state. A call to resume() will do nothing and will
-     *                    transition to READ state.
-     * READY_SUSPENDING - suspended, a call to suspend() was made while in READY
-     *                    state. A call to resume() will do nothing and will
-     *                    transition to READY state.
-     * SUSPENDED        - suspended
-     *                    Server case: read finished (READ_SUSPENDING) /
-     *                    a notification was received that data is ready to be
-     *                    read from the socket (READY_SUSPENDING), socket is
-     *                    not registered to the poller
-     *                    Client case: read finished (READ_SUSPENDING) / data
-     *                    has been read from the socket and is available for
-     *                    processing (READY_SUSPENDING)
-     *                    A call to resume() will:
-     *                    Server case: register the socket to the poller
-     *                    Client case: resume data processing
-     * CLOSING          - not suspended, a close will be send
+     * WAITING            - not suspended
+     *                      Server case: waiting for a notification that data
+     *                      is ready to be read from the socket, the socket is
+     *                      registered to the poller
+     *                      Client case: data has been read from the socket and
+     *                      is waiting for data to be processed
+     * PROCESSING         - not suspended
+     *                      Server case: reading from the socket and processing
+     *                      the data
+     *                      Client case: processing the data if such has
+     *                      already been read and more data will be read from
+     *                      the socket
+     * SUSPENDING_WAIT    - suspended, a call to suspend() was made while in
+     *                      WAITING state. A call to resume() will do nothing
+     *                      and will transition to WAITING state
+     * SUSPENDING_PROCESS - suspended, a call to suspend() was made while in
+     *                      PROCESSING state. A call to resume() will do
+     *                      nothing and will transition to PROCESSING state
+     * SUSPENDED          - suspended
+     *                      Server case: processing data finished
+     *                      (SUSPENDING_PROCESS) / a notification was received
+     *                      that data is ready to be read from the socket
+     *                      (SUSPENDING_WAIT), socket is not registered to the
+     *                      poller
+     *                      Client case: processing data finished
+     *                      (SUSPENDING_PROCESS) / data has been read from the
+     *                      socket and is available for processing
+     *                      (SUSPENDING_WAIT)
+     *                      A call to resume() will:
+     *                      Server case: register the socket to the poller
+     *                      Client case: resume data processing
+     * CLOSING            - not suspended, a close will be send
      *
      * 
      *     resume           data to be        resume
      *     no action        processed         no action
      *  |---------------| |---------------| |----------|
      *  |               v |               v v          |
-     *  |  |-----------READY<-------------READ------|  |
-     *  |  |             ^   read finished          |  |
+     *  |  |----------WAITING<--------PROCESSING----|  |
+     *  |  |             ^   processing             |  |
+     *  |  |             |   finished               |  |
      *  |  |             |                          |  |
      *  | suspend        |                     suspend |
      *  |  |             |                          |  |
@@ -739,19 +743,19 @@ private static enum State {
      *  |  |    resume data processing (client)     |  |
      *  |  |             |                          |  |
      *  |  v             |                          v  |
-     * READY_SUSPENDING  |                  READ_SUSPENDING
+     * SUSPENDING_WAIT   |                  SUSPENDING_PROCESS
      *  |                |                             |
-     *  | data available |           read finished     |
+     *  | data available |        processing finished  |
      *  |------------->SUSPENDED<----------------------|
      * 
*/ protected enum ReadState { - READY (false), - READ (false), - READY_SUSPENDING(true), - READ_SUSPENDING (true), - SUSPENDED (true), - CLOSING (false); + WAITING (false), + PROCESSING (false), + SUSPENDING_WAIT (true), + SUSPENDING_PROCESS(true), + SUSPENDED (true), + CLOSING (false); private final boolean isSuspended; @@ -767,20 +771,20 @@ public boolean isSuspended() { public void suspend() { while (true) { switch (readState) { - case READY: - if (!READ_STATE_UPDATER.compareAndSet(this, ReadState.READY, - ReadState.READY_SUSPENDING)) { + case WAITING: + if (!READ_STATE_UPDATER.compareAndSet(this, ReadState.WAITING, + ReadState.SUSPENDING_WAIT)) { continue; } return; - case READ: - if (!READ_STATE_UPDATER.compareAndSet(this, ReadState.READ, - ReadState.READ_SUSPENDING)) { + case PROCESSING: + if (!READ_STATE_UPDATER.compareAndSet(this, ReadState.PROCESSING, + ReadState.SUSPENDING_PROCESS)) { continue; } return; - case READY_SUSPENDING: - if (readState != ReadState.READY_SUSPENDING) { + case SUSPENDING_WAIT: + if (readState != ReadState.SUSPENDING_WAIT) { continue; } else { if (getLog().isWarnEnabled()) { @@ -788,8 +792,8 @@ public void suspend() { } } return; - case READ_SUSPENDING: - if (readState != ReadState.READ_SUSPENDING) { + case SUSPENDING_PROCESS: + if (readState != ReadState.SUSPENDING_PROCESS) { continue; } else { if (getLog().isWarnEnabled()) { @@ -817,8 +821,8 @@ public void suspend() { public void resume() { while (true) { switch (readState) { - case READY: - if (readState != ReadState.READY) { + case WAITING: + if (readState != ReadState.WAITING) { continue; } else { if (getLog().isWarnEnabled()) { @@ -826,8 +830,8 @@ public void resume() { } } return; - case READ: - if (readState != ReadState.READ) { + case PROCESSING: + if (readState != ReadState.PROCESSING) { continue; } else { if (getLog().isWarnEnabled()) { @@ -835,21 +839,21 @@ public void resume() { } } return; - case READY_SUSPENDING: - if (!READ_STATE_UPDATER.compareAndSet(this, ReadState.READY_SUSPENDING, - ReadState.READY)) { + case SUSPENDING_WAIT: + if (!READ_STATE_UPDATER.compareAndSet(this, ReadState.SUSPENDING_WAIT, + ReadState.WAITING)) { continue; } return; - case READ_SUSPENDING: - if (!READ_STATE_UPDATER.compareAndSet(this, ReadState.READ_SUSPENDING, - ReadState.READ)) { + case SUSPENDING_PROCESS: + if (!READ_STATE_UPDATER.compareAndSet(this, ReadState.SUSPENDING_PROCESS, + ReadState.PROCESSING)) { continue; } return; case SUSPENDED: if (!READ_STATE_UPDATER.compareAndSet(this, ReadState.SUSPENDED, - ReadState.READY)) { + ReadState.WAITING)) { continue; } resumeProcessing(); diff --git a/java/org/apache/tomcat/websocket/WsFrameClient.java b/java/org/apache/tomcat/websocket/WsFrameClient.java index a400a8ccf4ed..5e0d1b6a14c1 100644 --- a/java/org/apache/tomcat/websocket/WsFrameClient.java +++ b/java/org/apache/tomcat/websocket/WsFrameClient.java @@ -59,13 +59,13 @@ void startInputProcessing() { private void processSocketRead() throws IOException { while (true) { switch (getReadState()) { - case READY: - if (!changeReadState(ReadState.READY, ReadState.READ)) { + case WAITING: + if (!changeReadState(ReadState.WAITING, ReadState.PROCESSING)) { continue; } while (response.hasRemaining()) { if (isSuspended()) { - if (!changeReadState(ReadState.READ_SUSPENDING, ReadState.SUSPENDED)) { + if (!changeReadState(ReadState.SUSPENDING_PROCESS, ReadState.SUSPENDED)) { continue; } // There is still data available in the response buffer @@ -103,8 +103,8 @@ private void processSocketRead() throws IOException { changeReadState(ReadState.CLOSING); } return; - case READY_SUSPENDING: - if (!changeReadState(ReadState.READY_SUSPENDING, ReadState.SUSPENDED)) { + case SUSPENDING_WAIT: + if (!changeReadState(ReadState.SUSPENDING_WAIT, ReadState.SUSPENDED)) { continue; } return; @@ -179,14 +179,14 @@ public void failed(Throwable exc, Void attachment) { private void doResumeProcessing(boolean checkOpenOnError) { while (true) { switch (getReadState()) { - case READ: - if (!changeReadState(ReadState.READ, ReadState.READY)) { + case PROCESSING: + if (!changeReadState(ReadState.PROCESSING, ReadState.WAITING)) { continue; } resumeProcessing(checkOpenOnError); return; - case READ_SUSPENDING: - if (!changeReadState(ReadState.READ_SUSPENDING, ReadState.SUSPENDED)) { + case SUSPENDING_PROCESS: + if (!changeReadState(ReadState.SUSPENDING_PROCESS, ReadState.SUSPENDED)) { continue; } return; diff --git a/java/org/apache/tomcat/websocket/server/WsFrameServer.java b/java/org/apache/tomcat/websocket/server/WsFrameServer.java index 897610a9daae..6b7fafa13b07 100644 --- a/java/org/apache/tomcat/websocket/server/WsFrameServer.java +++ b/java/org/apache/tomcat/websocket/server/WsFrameServer.java @@ -143,8 +143,8 @@ protected void resumeProcessing() { SocketState notifyDataAvailable() throws IOException { while (isOpen()) { switch (getReadState()) { - case READY: - if (!changeReadState(ReadState.READY, ReadState.READ)) { + case WAITING: + if (!changeReadState(ReadState.WAITING, ReadState.PROCESSING)) { continue; } try { @@ -153,8 +153,8 @@ SocketState notifyDataAvailable() throws IOException { changeReadState(ReadState.CLOSING); throw e; } - case READY_SUSPENDING: - if (!changeReadState(ReadState.READY_SUSPENDING, ReadState.SUSPENDED)) { + case SUSPENDING_WAIT: + if (!changeReadState(ReadState.SUSPENDING_WAIT, ReadState.SUSPENDED)) { continue; } return SocketState.SUSPENDED; @@ -171,13 +171,13 @@ private SocketState doOnDataAvailable() throws IOException { onDataAvailable(); while (isOpen()) { switch (getReadState()) { - case READ: - if (!changeReadState(ReadState.READ, ReadState.READY)) { + case PROCESSING: + if (!changeReadState(ReadState.PROCESSING, ReadState.WAITING)) { continue; } return SocketState.UPGRADED; - case READ_SUSPENDING: - if (!changeReadState(ReadState.READ_SUSPENDING, ReadState.SUSPENDED)) { + case SUSPENDING_PROCESS: + if (!changeReadState(ReadState.SUSPENDING_PROCESS, ReadState.SUSPENDED)) { continue; } return SocketState.SUSPENDED;