Skip to content

Commit

Permalink
fix flaky test ClientCloseTest.testWriteException
Browse files Browse the repository at this point in the history
Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
  • Loading branch information
lachlan-roberts authored and gregw committed May 1, 2019
1 parent c33f3a7 commit bb8e555
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 21 deletions.
Expand Up @@ -19,11 +19,11 @@
package org.eclipse.jetty.websocket.tests.client;


import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand All @@ -36,6 +36,7 @@
import org.eclipse.jetty.server.handler.HandlerList;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.CloseException;
Expand Down Expand Up @@ -66,11 +67,13 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class ClientCloseTest
{
private Server server;
private WebSocketClient client;
private BlockingArrayQueue<ServerEndpoint> serverEndpoints = new BlockingArrayQueue<>();

private Session confirmConnection(CloseTrackingEndpoint clientSocket, Future<Session> clientFuture) throws Exception
{
Expand Down Expand Up @@ -128,7 +131,12 @@ public void configure(WebSocketServletFactory factory)
{
factory.getPolicy().setIdleTimeout(10000);
factory.getPolicy().setMaxTextMessageSize(1024 * 1024 * 2);
factory.register(ServerEndpoint.class);
factory.setCreator((req,resp)->
{
ServerEndpoint endpoint = new ServerEndpoint();
serverEndpoints.offer(endpoint);
return endpoint;
});
}
});
context.addServlet(holder, "/ws");
Expand Down Expand Up @@ -353,29 +361,42 @@ public void testWriteException() throws Exception
// client confirms connection via echo
confirmConnection(clientSocket, clientConnectFuture);

// setup client endpoint for write failure (test only)
EndPoint endp = clientSocket.getEndPoint();
endp.shutdownOutput();
try
{
// Block on the server so that the server does not detect a read failure
clientSocket.getSession().getRemote().sendString("block");

// client enqueue close frame
// should result in a client write failure
final String origCloseReason = "Normal Close from Client";
clientSocket.getSession().close(StatusCode.NORMAL, origCloseReason);
// setup client endpoint for write failure (test only)
EndPoint endp = clientSocket.getEndPoint();
endp.shutdownOutput();

assertThat("OnError Latch", clientSocket.errorLatch.await(2, SECONDS), is(true));
assertThat("OnError", clientSocket.error.get(), instanceOf(EofException.class));
// client enqueue close frame
// should result in a client write failure
final String origCloseReason = "Normal Close from Client";
clientSocket.getSession().close(StatusCode.NORMAL, origCloseReason);

// client triggers close event on client ws-endpoint
// assert - close code==1006 (abnormal) or code==1001 (shutdown)
clientSocket.assertReceivedCloseEvent(timeout, anyOf(is(StatusCode.SHUTDOWN), is(StatusCode.ABNORMAL)));
assertThat("OnError Latch", clientSocket.errorLatch.await(2, SECONDS), is(true));
assertThat("OnError", clientSocket.error.get(), instanceOf(EofException.class));

clientSessionTracker.assertClosedProperly(client);
// client triggers close event on client ws-endpoint
// assert - close code==1006 (abnormal)
clientSocket.assertReceivedCloseEvent(timeout, is(StatusCode.ABNORMAL), null);
clientSessionTracker.assertClosedProperly(client);

assertThat(serverEndpoints.size(), is(1));
}
finally
{
for (ServerEndpoint endpoint : serverEndpoints)
endpoint.block.countDown();
}
}

public static class ServerEndpoint implements WebSocketFrameListener, WebSocketListener
{
private static final Logger LOG = Log.getLogger(ServerEndpoint.class);
private Session session;
CountDownLatch block = new CountDownLatch(1);

@Override
public void onWebSocketBinary(byte[] payload, int offset, int len)
Expand All @@ -395,15 +416,22 @@ public void onWebSocketText(String message)
String bigmsg = new String(buf, UTF_8);
session.getRemote().sendString(bigmsg);
}
else if (message.equals("block"))
{
LOG.debug("blocking");
assertTrue(block.await(5, TimeUnit.MINUTES));
LOG.debug("unblocked");
}
else
{
// simple echo
session.getRemote().sendString(message);
}
}
catch (IOException ignore)
catch (Throwable t)
{
LOG.debug(ignore);
LOG.debug(t);
throw new RuntimeException(t);
}
}

Expand All @@ -422,9 +450,7 @@ public void onWebSocketConnect(Session session)
public void onWebSocketError(Throwable cause)
{
if (LOG.isDebugEnabled())
{
LOG.debug(cause);
}
LOG.debug("onWebSocketError(): ", cause);
}

@Override
Expand Down
Expand Up @@ -94,8 +94,8 @@ private Flusher(ByteBufferPool bufferPool, Generator generator, EndPoint endpoin
@Override
public void onCompleteFailure(Throwable failure)
{
super.onCompleteFailure(failure);
AbstractWebSocketConnection.this.close(failure);
super.onCompleteFailure(failure);
}
}

Expand Down

0 comments on commit bb8e555

Please sign in to comment.